diff --git a/akka-actor/src/main/scala/actor/ActorRef.scala b/akka-actor/src/main/scala/actor/ActorRef.scala index 7777051ac1..4d422e7c57 100644 --- a/akka-actor/src/main/scala/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/actor/ActorRef.scala @@ -1344,17 +1344,18 @@ object RemoteActorSystemMessage { * @author Jonas Bonér */ private[akka] case class RemoteActorRef private[akka] ( - uuuid: String, + classOrServiceName: String, val className: String, val hostname: String, val port: Int, _timeout: Long, - loader: Option[ClassLoader]) + loader: Option[ClassLoader], + val actorType: ActorType = ActorType.ScalaActor) extends ActorRef with ScalaActorRef { ensureRemotingEnabled - _uuid = uuuid + id = classOrServiceName timeout = _timeout start @@ -1362,7 +1363,7 @@ private[akka] case class RemoteActorRef private[akka] ( def postMessageToMailbox(message: Any, senderOption: Option[ActorRef]): Unit = RemoteClientModule.send[Any]( - message, senderOption, None, remoteAddress.get, timeout, true, this, None, ActorType.ScalaActor) + message, senderOption, None, remoteAddress.get, timeout, true, this, None, actorType) def postMessageToMailboxAndCreateFutureResultWithTimeout[T]( message: Any, @@ -1370,7 +1371,7 @@ private[akka] case class RemoteActorRef private[akka] ( senderOption: Option[ActorRef], senderFuture: Option[CompletableFuture[T]]): CompletableFuture[T] = { val future = RemoteClientModule.send[T]( - message, senderOption, senderFuture, remoteAddress.get, timeout, false, this, None, ActorType.ScalaActor) + message, senderOption, senderFuture, remoteAddress.get, timeout, false, this, None, actorType) if (future.isDefined) future.get else throw new IllegalActorStateException("Expected a future from remote call to actor " + toString) } diff --git a/akka-remote/src/main/scala/remote/RemoteClient.scala b/akka-remote/src/main/scala/remote/RemoteClient.scala index 264081259f..52585a491c 100644 --- a/akka-remote/src/main/scala/remote/RemoteClient.scala +++ b/akka-remote/src/main/scala/remote/RemoteClient.scala @@ -30,6 +30,7 @@ import java.util.concurrent.atomic.AtomicLong import scala.collection.mutable.{HashSet, HashMap} import scala.reflect.BeanProperty +import se.scalablesolutions.akka.actor._ /** * Atomic remote request/reply message id generator. @@ -76,8 +77,6 @@ object RemoteClient extends Logging { private val remoteClients = new HashMap[String, RemoteClient] private val remoteActors = new HashMap[RemoteServer.Address, HashSet[String]] - // FIXME: simplify overloaded methods when we have Scala 2.8 - def actorFor(classNameOrServiceId: String, hostname: String, port: Int): ActorRef = actorFor(classNameOrServiceId, classNameOrServiceId, 5000L, hostname, port, None) @@ -99,6 +98,27 @@ object RemoteClient extends Logging { def actorFor(serviceId: String, className: String, timeout: Long, hostname: String, port: Int): ActorRef = RemoteActorRef(serviceId, className, hostname, port, timeout, None) + def typedActorFor[T](intfClass: Class[T], serviceIdOrClassName: String, hostname: String, port: Int) : T = { + typedActorFor(intfClass, serviceIdOrClassName, serviceIdOrClassName, 5000L, hostname, port, None) + } + + def typedActorFor[T](intfClass: Class[T], serviceIdOrClassName: String, timeout: Long, hostname: String, port: Int) : T = { + typedActorFor(intfClass, serviceIdOrClassName, serviceIdOrClassName, timeout, hostname, port, None) + } + + def typedActorFor[T](intfClass: Class[T], serviceIdOrClassName: String, timeout: Long, hostname: String, port: Int, loader: ClassLoader) : T = { + typedActorFor(intfClass, serviceIdOrClassName, serviceIdOrClassName, timeout, hostname, port, Some(loader)) + } + + def typedActorFor[T](intfClass: Class[T], serviceId: String, implClassName: String, timeout: Long, hostname: String, port: Int, loader: ClassLoader) : T = { + typedActorFor(intfClass, serviceId, implClassName, timeout, hostname, port, Some(loader)) + } + + private[akka] def typedActorFor[T](intfClass: Class[T], serviceId: String, implClassName: String, timeout: Long, hostname: String, port: Int, loader: Option[ClassLoader]) : T = { + val actorRef = RemoteActorRef(serviceId, implClassName, hostname, port, timeout, loader, ActorType.TypedActor) + TypedActor.createProxyForRemoteActorRef(intfClass, actorRef) + } + private[akka] def actorFor(serviceId: String, className: String, timeout: Long, hostname: String, port: Int, loader: ClassLoader): ActorRef = RemoteActorRef(serviceId, className, hostname, port, timeout, Some(loader)) diff --git a/akka-remote/src/main/scala/remote/RemoteServer.scala b/akka-remote/src/main/scala/remote/RemoteServer.scala index 27b5a50bfc..b6fd275188 100644 --- a/akka-remote/src/main/scala/remote/RemoteServer.scala +++ b/akka-remote/src/main/scala/remote/RemoteServer.scala @@ -10,7 +10,7 @@ import java.util.concurrent.{ConcurrentHashMap, Executors} import java.util.{Map => JMap} import se.scalablesolutions.akka.actor.{ - Actor, TypedActor, ActorRef, LocalActorRef, RemoteActorRef, IllegalActorStateException, RemoteActorSystemMessage} + Actor, TypedActor, ActorRef, IllegalActorStateException, RemoteActorSystemMessage} import se.scalablesolutions.akka.actor.Actor._ import se.scalablesolutions.akka.util._ import se.scalablesolutions.akka.remote.protocol.RemoteProtocol._ @@ -133,8 +133,8 @@ object RemoteServer { actorsFor(RemoteServer.Address(address.getHostName, address.getPort)).actors.put(uuid, actor) } - private[akka] def registerTypedActor(address: InetSocketAddress, name: String, typedActor: AnyRef) = guard.withWriteGuard { - actorsFor(RemoteServer.Address(address.getHostName, address.getPort)).typedActors.put(name, typedActor) + private[akka] def registerTypedActor(address: InetSocketAddress, uuid: String, typedActor: AnyRef) = guard.withWriteGuard { + actorsFor(RemoteServer.Address(address.getHostName, address.getPort)).typedActors.put(uuid, typedActor) } private[akka] def getOrCreateServer(address: InetSocketAddress): RemoteServer = guard.withWriteGuard { @@ -271,12 +271,28 @@ class RemoteServer extends Logging with ListenerManagement { } } - // TODO: register typed actor in RemoteServer as well + /** + * Register typed actor by interface name. + */ + def registerTypedActor(intfClass: Class[_], typedActor: AnyRef) : Unit = registerTypedActor(intfClass.getName, typedActor) /** - * Register Remote Actor by the Actor's 'uuid' field. It starts the Actor if it is not started already. + * Register remote typed actor by a specific id. + * @param id custom actor id + * @param typedActor typed actor to register */ - def register(actorRef: ActorRef): Unit = register(actorRef.id,actorRef) + def registerTypedActor(id: String, typedActor: AnyRef): Unit = synchronized { + val typedActors = RemoteServer.actorsFor(RemoteServer.Address(hostname, port)).typedActors + if (!typedActors.contains(id)) { + log.debug("Registering server side remote actor [%s] with id [%s] on [%s:%d]", typedActor.getClass.getName, id, hostname, port) + typedActors.put(id, typedActor) + } + } + + /** + * Register Remote Actor by the Actor's 'id' field. It starts the Actor if it is not started already. + */ + def register(actorRef: ActorRef): Unit = register(actorRef.id, actorRef) /** * Register Remote Actor by a specific 'id' passed as argument. @@ -321,16 +337,25 @@ class RemoteServer extends Logging with ListenerManagement { } } + /** + * Unregister Remote Typed Actor by specific 'id'. + *

+ * NOTE: You need to call this method if you have registered an actor by a custom ID. + */ + def unregisterTypedActor(id: String):Unit = synchronized { + if (_isRunning) { + log.info("Unregistering server side remote typed actor with id [%s]", id) + val registeredTypedActors = typedActors() + registeredTypedActors.remove(id) + } + } + protected override def manageLifeCycleOfListeners = false protected[akka] override def notifyListeners(message: => Any): Unit = super.notifyListeners(message) - private[akka] def actors() : ConcurrentHashMap[String, ActorRef] = { - RemoteServer.actorsFor(address).actors - } - private[akka] def typedActors() : ConcurrentHashMap[String, AnyRef] = { - RemoteServer.actorsFor(address).typedActors - } + private[akka] def actors() = RemoteServer.actorsFor(address).actors + private[akka] def typedActors() = RemoteServer.actorsFor(address).typedActors } object RemoteServerSslContext { @@ -530,6 +555,32 @@ class RemoteServerHandler( } } + /** + * Find a registered actor by ID (default) or UUID. + * Actors are registered by id apart from registering during serialization see SerializationProtocol. + */ + private def findActorByIdOrUuid(id: String, uuid: String) : ActorRef = { + val registeredActors = server.actors() + var actorRefOrNull = registeredActors get id + if (actorRefOrNull eq null) { + actorRefOrNull = registeredActors get uuid + } + actorRefOrNull + } + + /** + * Find a registered typed actor by ID (default) or UUID. + * Actors are registered by id apart from registering during serialization see SerializationProtocol. + */ + private def findTypedActorByIdOrUUid(id: String, uuid: String) : AnyRef = { + val registeredActors = server.typedActors() + var actorRefOrNull = registeredActors get id + if (actorRefOrNull eq null) { + actorRefOrNull = registeredActors get uuid + } + actorRefOrNull + } + /** * Creates a new instance of the actor with name, uuid and timeout specified as arguments. * @@ -538,12 +589,14 @@ class RemoteServerHandler( * Does not start the actor. */ private def createActor(actorInfo: ActorInfoProtocol): ActorRef = { - val uuid = actorInfo.getUuid + val ids = actorInfo.getUuid.split(':') + val uuid = ids(0) + val id = ids(1) + val name = actorInfo.getTarget val timeout = actorInfo.getTimeout - val registeredActors = server.actors() - val actorRefOrNull = registeredActors get uuid + val actorRefOrNull = findActorByIdOrUuid(id, uuid) if (actorRefOrNull eq null) { try { @@ -552,9 +605,10 @@ class RemoteServerHandler( else Class.forName(name) val actorRef = Actor.actorOf(clazz.newInstance.asInstanceOf[Actor]) actorRef.uuid = uuid + actorRef.id = id actorRef.timeout = timeout actorRef.remoteAddress = None - registeredActors.put(uuid, actorRef) + server.actors.put(id, actorRef) // register by id actorRef } catch { case e => @@ -566,9 +620,11 @@ class RemoteServerHandler( } private def createTypedActor(actorInfo: ActorInfoProtocol): AnyRef = { - val uuid = actorInfo.getUuid - val registeredTypedActors = server.typedActors() - val typedActorOrNull = registeredTypedActors get uuid + val ids = actorInfo.getUuid.split(':') + val uuid = ids(0) + val id = ids(1) + + val typedActorOrNull = findTypedActorByIdOrUUid(id, uuid) if (typedActorOrNull eq null) { val typedActorInfo = actorInfo.getTypedActorInfo @@ -585,7 +641,7 @@ class RemoteServerHandler( val newInstance = TypedActor.newInstance( interfaceClass, targetClass.asInstanceOf[Class[_ <: TypedActor]], actorInfo.getTimeout).asInstanceOf[AnyRef] - registeredTypedActors.put(uuid, newInstance) + server.typedActors.put(id, newInstance) // register by id newInstance } catch { case e => diff --git a/akka-remote/src/main/scala/serialization/SerializationProtocol.scala b/akka-remote/src/main/scala/serialization/SerializationProtocol.scala index 7da001edab..afebae8f3b 100644 --- a/akka-remote/src/main/scala/serialization/SerializationProtocol.scala +++ b/akka-remote/src/main/scala/serialization/SerializationProtocol.scala @@ -230,7 +230,7 @@ object RemoteActorSerialization { } RemoteActorRefProtocol.newBuilder - .setUuid(uuid) + .setUuid(uuid + ":" + id) .setActorClassname(actorClass.getName) .setHomeAddress(AddressProtocol.newBuilder.setHostname(host).setPort(port).build) .setTimeout(timeout) @@ -248,7 +248,7 @@ object RemoteActorSerialization { import actorRef._ val actorInfoBuilder = ActorInfoProtocol.newBuilder - .setUuid(uuid) + .setUuid(uuid + ":" + actorRef.id) .setTarget(actorClassName) .setTimeout(timeout) diff --git a/akka-remote/src/test/resources/META-INF/aop.xml b/akka-remote/src/test/resources/META-INF/aop.xml index bdc167ca54..be133a51b8 100644 --- a/akka-remote/src/test/resources/META-INF/aop.xml +++ b/akka-remote/src/test/resources/META-INF/aop.xml @@ -2,6 +2,7 @@ + diff --git a/akka-remote/src/test/scala/remote/ClientInitiatedRemoteActorSpec.scala b/akka-remote/src/test/scala/remote/ClientInitiatedRemoteActorSpec.scala index 7ff46ab910..6670722b02 100644 --- a/akka-remote/src/test/scala/remote/ClientInitiatedRemoteActorSpec.scala +++ b/akka-remote/src/test/scala/remote/ClientInitiatedRemoteActorSpec.scala @@ -93,6 +93,7 @@ class ClientInitiatedRemoteActorSpec extends JUnitSuite { actor.stop } + @Test def shouldSendOneWayAndReceiveReply = { val actor = actorOf[SendOneWayAndReplyReceiverActor] @@ -103,7 +104,7 @@ class ClientInitiatedRemoteActorSpec extends JUnitSuite { sender.actor.asInstanceOf[SendOneWayAndReplySenderActor].sendTo = actor sender.start sender.actor.asInstanceOf[SendOneWayAndReplySenderActor].sendOff - assert(SendOneWayAndReplySenderActor.latch.await(1, TimeUnit.SECONDS)) + assert(SendOneWayAndReplySenderActor.latch.await(3, TimeUnit.SECONDS)) assert(sender.actor.asInstanceOf[SendOneWayAndReplySenderActor].state.isDefined === true) assert("World" === sender.actor.asInstanceOf[SendOneWayAndReplySenderActor].state.get.asInstanceOf[String]) actor.stop @@ -134,6 +135,6 @@ class ClientInitiatedRemoteActorSpec extends JUnitSuite { assert("Expected exception; to test fault-tolerance" === e.getMessage()) } actor.stop - } + } } diff --git a/akka-remote/src/test/scala/remote/RemoteTypedActorSpec.scala b/akka-remote/src/test/scala/remote/RemoteTypedActorSpec.scala index 780828c310..8b28b35f57 100644 --- a/akka-remote/src/test/scala/remote/RemoteTypedActorSpec.scala +++ b/akka-remote/src/test/scala/remote/RemoteTypedActorSpec.scala @@ -4,10 +4,7 @@ package se.scalablesolutions.akka.actor.remote -import org.scalatest.Spec -import org.scalatest.Assertions import org.scalatest.matchers.ShouldMatchers -import org.scalatest.BeforeAndAfterAll import org.scalatest.junit.JUnitRunner import org.junit.runner.RunWith @@ -19,6 +16,7 @@ import se.scalablesolutions.akka.actor._ import se.scalablesolutions.akka.remote.{RemoteServer, RemoteClient} import java.util.concurrent.{LinkedBlockingQueue, TimeUnit, BlockingQueue} +import org.scalatest.{BeforeAndAfterEach, Spec, Assertions, BeforeAndAfterAll} object RemoteTypedActorSpec { val HOSTNAME = "localhost" @@ -40,7 +38,7 @@ object RemoteTypedActorLog { class RemoteTypedActorSpec extends Spec with ShouldMatchers with - BeforeAndAfterAll { + BeforeAndAfterEach with BeforeAndAfterAll { import RemoteTypedActorLog._ import RemoteTypedActorSpec._ @@ -82,6 +80,10 @@ class RemoteTypedActorSpec extends ActorRegistry.shutdownAll } + override def afterEach() { + server.typedActors.clear + } + describe("Remote Typed Actor ") { it("should receive one-way message") { diff --git a/akka-remote/src/test/scala/remote/ServerInitiatedRemoteActorSpec.scala b/akka-remote/src/test/scala/remote/ServerInitiatedRemoteActorSpec.scala index 5baa799258..7a2676d040 100644 --- a/akka-remote/src/test/scala/remote/ServerInitiatedRemoteActorSpec.scala +++ b/akka-remote/src/test/scala/remote/ServerInitiatedRemoteActorSpec.scala @@ -79,6 +79,7 @@ class ServerInitiatedRemoteActorSpec extends JUnitSuite { } } + @Test def shouldSendWithBang { val actor = RemoteClient.actorFor( @@ -153,10 +154,29 @@ class ServerInitiatedRemoteActorSpec extends JUnitSuite { server.register(actorOf[RemoteActorSpecActorUnidirectional]) val actor = RemoteClient.actorFor("se.scalablesolutions.akka.actor.remote.ServerInitiatedRemoteActorSpec$RemoteActorSpecActorUnidirectional", HOSTNAME, PORT) val numberOfActorsInRegistry = ActorRegistry.actors.length - val result = actor ! "OneWay" + actor ! "OneWay" assert(RemoteActorSpecActorUnidirectional.latch.await(1, TimeUnit.SECONDS)) assert(numberOfActorsInRegistry === ActorRegistry.actors.length) actor.stop } + + @Test + def shouldUseServiceNameAsIdForRemoteActorRef { + server.register(actorOf[RemoteActorSpecActorUnidirectional]) + server.register("my-service", actorOf[RemoteActorSpecActorUnidirectional]) + val actor1 = RemoteClient.actorFor("se.scalablesolutions.akka.actor.remote.ServerInitiatedRemoteActorSpec$RemoteActorSpecActorUnidirectional", HOSTNAME, PORT) + val actor2 = RemoteClient.actorFor("my-service", HOSTNAME, PORT) + val actor3 = RemoteClient.actorFor("my-service", HOSTNAME, PORT) + + actor1 ! "OneWay" + actor2 ! "OneWay" + actor3 ! "OneWay" + + assert(actor1.uuid != actor2.uuid) + assert(actor1.uuid != actor3.uuid) + assert(actor1.id != actor2.id) + assert(actor2.id == actor3.id) + } + } diff --git a/akka-remote/src/test/scala/remote/ServerInitiatedRemoteTypedActorSpec.scala b/akka-remote/src/test/scala/remote/ServerInitiatedRemoteTypedActorSpec.scala new file mode 100644 index 0000000000..b800fbf2c3 --- /dev/null +++ b/akka-remote/src/test/scala/remote/ServerInitiatedRemoteTypedActorSpec.scala @@ -0,0 +1,112 @@ +/** + * Copyright (C) 2009-2010 Scalable Solutions AB + */ + +package se.scalablesolutions.akka.actor.remote + +import org.scalatest.Spec +import org.scalatest.matchers.ShouldMatchers +import org.scalatest.BeforeAndAfterAll +import org.scalatest.junit.JUnitRunner +import org.junit.runner.RunWith + +import java.util.concurrent.TimeUnit + +import se.scalablesolutions.akka.remote.{RemoteServer, RemoteClient} +import se.scalablesolutions.akka.actor._ +import RemoteTypedActorLog._ + +object ServerInitiatedRemoteTypedActorSpec { + val HOSTNAME = "localhost" + val PORT = 9990 + var server: RemoteServer = null +} + +@RunWith(classOf[JUnitRunner]) +class ServerInitiatedRemoteTypedActorSpec extends + Spec with + ShouldMatchers with + BeforeAndAfterAll { + import ServerInitiatedRemoteTypedActorSpec._ + + private val unit = TimeUnit.MILLISECONDS + + + override def beforeAll = { + server = new RemoteServer() + server.start(HOSTNAME, PORT) + + val typedActor = TypedActor.newInstance(classOf[RemoteTypedActorOne], classOf[RemoteTypedActorOneImpl], 1000) + server.registerTypedActor("typed-actor-service", typedActor) + + Thread.sleep(1000) + } + + // make sure the servers shutdown cleanly after the test has finished + override def afterAll = { + try { + server.shutdown + RemoteClient.shutdownAll + Thread.sleep(1000) + } catch { + case e => () + } + } + + describe("Server managed remote typed Actor ") { + + it("should receive one-way message") { + clearMessageLogs + val actor = RemoteClient.typedActorFor(classOf[RemoteTypedActorOne], "typed-actor-service", 5000L, HOSTNAME, PORT) + expect("oneway") { + actor.oneWay + oneWayLog.poll(5, TimeUnit.SECONDS) + } + } + + it("should respond to request-reply message") { + clearMessageLogs + val actor = RemoteClient.typedActorFor(classOf[RemoteTypedActorOne], "typed-actor-service", 5000L, HOSTNAME, PORT) + expect("pong") { + actor.requestReply("ping") + } + } + + it("should not recreate registered actors") { + val actor = RemoteClient.typedActorFor(classOf[RemoteTypedActorOne], "typed-actor-service", 5000L, HOSTNAME, PORT) + val numberOfActorsInRegistry = ActorRegistry.actors.length + expect("oneway") { + actor.oneWay + oneWayLog.poll(5, TimeUnit.SECONDS) + } + assert(numberOfActorsInRegistry === ActorRegistry.actors.length) + } + + it("should support multiple variants to get the actor from client side") { + var actor = RemoteClient.typedActorFor(classOf[RemoteTypedActorOne], "typed-actor-service", 5000L, HOSTNAME, PORT) + expect("oneway") { + actor.oneWay + oneWayLog.poll(5, TimeUnit.SECONDS) + } + actor = RemoteClient.typedActorFor(classOf[RemoteTypedActorOne], "typed-actor-service", HOSTNAME, PORT) + expect("oneway") { + actor.oneWay + oneWayLog.poll(5, TimeUnit.SECONDS) + } + actor = RemoteClient.typedActorFor(classOf[RemoteTypedActorOne], "typed-actor-service", 5000L, HOSTNAME, PORT, this.getClass().getClassLoader) + expect("oneway") { + actor.oneWay + oneWayLog.poll(5, TimeUnit.SECONDS) + } + } + + it("should register and unregister typed actors") { + val typedActor = TypedActor.newInstance(classOf[RemoteTypedActorOne], classOf[RemoteTypedActorOneImpl], 1000) + server.registerTypedActor("my-test-service", typedActor) + assert(server.typedActors().get("my-test-service") != null) + server.unregisterTypedActor("my-test-service") + assert(server.typedActors().get("my-test-service") == null) + } + } +} + diff --git a/akka-spring/src/main/resources/se/scalablesolutions/akka/spring/akka-1.0-SNAPSHOT.xsd b/akka-spring/src/main/resources/se/scalablesolutions/akka/spring/akka-1.0-SNAPSHOT.xsd index cf3c8ffafc..84a382a78e 100644 --- a/akka-spring/src/main/resources/se/scalablesolutions/akka/spring/akka-1.0-SNAPSHOT.xsd +++ b/akka-spring/src/main/resources/se/scalablesolutions/akka/spring/akka-1.0-SNAPSHOT.xsd @@ -64,6 +64,14 @@ + + + + + + + + @@ -105,6 +113,20 @@ + + + + Management type for remote actors: client managed or server managed. + + + + + + + Custom service name for server managed actor. + + + @@ -133,7 +155,7 @@ - Theh default timeout for '!!' invocations. + The default timeout for '!!' invocations. @@ -227,6 +249,41 @@ + + + + + + + + Name of the remote host. + + + + + + + Port of the remote host. + + + + + + + Custom service name or class name for the server managed actor. + + + + + + + Name of the interface the typed actor implements. + + + + + + @@ -292,4 +349,7 @@ + + + diff --git a/akka-spring/src/main/scala/ActorBeanDefinitionParser.scala b/akka-spring/src/main/scala/ActorBeanDefinitionParser.scala new file mode 100644 index 0000000000..55aa82b8e4 --- /dev/null +++ b/akka-spring/src/main/scala/ActorBeanDefinitionParser.scala @@ -0,0 +1,72 @@ +/** + * Copyright (C) 2009-2010 Scalable Solutions AB + */ +package se.scalablesolutions.akka.spring + +import org.springframework.beans.factory.support.BeanDefinitionBuilder +import org.springframework.beans.factory.xml.AbstractSingleBeanDefinitionParser +import org.springframework.beans.factory.xml.ParserContext +import AkkaSpringConfigurationTags._ +import org.w3c.dom.Element + + +/** + * Parser for custom namespace configuration. + * @author michaelkober + */ +class TypedActorBeanDefinitionParser extends AbstractSingleBeanDefinitionParser with ActorParser { + /* + * @see org.springframework.beans.factory.xml.AbstractSingleBeanDefinitionParser#doParse(org.w3c.dom.Element, org.springframework.beans.factory.xml.ParserContext, org.springframework.beans.factory.support.BeanDefinitionBuilder) + */ + override def doParse(element: Element, parserContext: ParserContext, builder: BeanDefinitionBuilder) { + val typedActorConf = parseActor(element) + typedActorConf.typed = TYPED_ACTOR_TAG + typedActorConf.setAsProperties(builder) + } + + /* + * @see org.springframework.beans.factory.xml.AbstractSingleBeanDefinitionParser#getBeanClass(org.w3c.dom.Element) + */ + override def getBeanClass(element: Element): Class[_] = classOf[ActorFactoryBean] +} + + +/** + * Parser for custom namespace configuration. + * @author michaelkober + */ +class UntypedActorBeanDefinitionParser extends AbstractSingleBeanDefinitionParser with ActorParser { + /* + * @see org.springframework.beans.factory.xml.AbstractSingleBeanDefinitionParser#doParse(org.w3c.dom.Element, org.springframework.beans.factory.xml.ParserContext, org.springframework.beans.factory.support.BeanDefinitionBuilder) + */ + override def doParse(element: Element, parserContext: ParserContext, builder: BeanDefinitionBuilder) { + val untypedActorConf = parseActor(element) + untypedActorConf.typed = UNTYPED_ACTOR_TAG + untypedActorConf.setAsProperties(builder) + } + + /* + * @see org.springframework.beans.factory.xml.AbstractSingleBeanDefinitionParser#getBeanClass(org.w3c.dom.Element) + */ + override def getBeanClass(element: Element): Class[_] = classOf[ActorFactoryBean] +} + + +/** + * Parser for custom namespace configuration. + * @author michaelkober + */ +class ActorForBeanDefinitionParser extends AbstractSingleBeanDefinitionParser with ActorForParser { + /* + * @see org.springframework.beans.factory.xml.AbstractSingleBeanDefinitionParser#doParse(org.w3c.dom.Element, org.springframework.beans.factory.xml.ParserContext, org.springframework.beans.factory.support.BeanDefinitionBuilder) + */ + override def doParse(element: Element, parserContext: ParserContext, builder: BeanDefinitionBuilder) { + val actorForConf = parseActorFor(element) + actorForConf.setAsProperties(builder) + } + + /* + * @see org.springframework.beans.factory.xml.AbstractSingleBeanDefinitionParser#getBeanClass(org.w3c.dom.Element) + */ + override def getBeanClass(element: Element): Class[_] = classOf[ActorForFactoryBean] +} diff --git a/akka-spring/src/main/scala/ActorFactoryBean.scala b/akka-spring/src/main/scala/ActorFactoryBean.scala index ee4b370b4f..caa344825a 100644 --- a/akka-spring/src/main/scala/ActorFactoryBean.scala +++ b/akka-spring/src/main/scala/ActorFactoryBean.scala @@ -4,22 +4,19 @@ package se.scalablesolutions.akka.spring -import java.beans.PropertyDescriptor -import java.lang.reflect.Method -import javax.annotation.PreDestroy -import javax.annotation.PostConstruct - import org.springframework.beans.{BeanUtils,BeansException,BeanWrapper,BeanWrapperImpl} -import org.springframework.beans.factory.BeanFactory +import se.scalablesolutions.akka.remote.{RemoteClient, RemoteServer} +//import org.springframework.beans.factory.BeanFactory import org.springframework.beans.factory.config.AbstractFactoryBean import org.springframework.context.{ApplicationContext,ApplicationContextAware} -import org.springframework.util.ReflectionUtils +//import org.springframework.util.ReflectionUtils import org.springframework.util.StringUtils import se.scalablesolutions.akka.actor.{ActorRef, AspectInitRegistry, TypedActorConfiguration, TypedActor,Actor} import se.scalablesolutions.akka.dispatch.MessageDispatcher import se.scalablesolutions.akka.util.{Logging, Duration} import scala.reflect.BeanProperty +import java.net.InetSocketAddress /** * Exception to use when something goes wrong during bean creation. @@ -49,6 +46,8 @@ class ActorFactoryBean extends AbstractFactoryBean[AnyRef] with Logging with App @BeanProperty var transactional: Boolean = false @BeanProperty var host: String = "" @BeanProperty var port: Int = _ + @BeanProperty var serverManaged: Boolean = false + @BeanProperty var serviceName: String = "" @BeanProperty var lifecycle: String = "" @BeanProperty var dispatcher: DispatcherProperties = _ @BeanProperty var scope: String = VAL_SCOPE_SINGLETON @@ -94,7 +93,16 @@ class ActorFactoryBean extends AbstractFactoryBean[AnyRef] with Logging with App if (implementation == null || implementation == "") throw new AkkaBeansException( "The 'implementation' part of the 'akka:typed-actor' element in the Spring config file can't be null or empty string") - TypedActor.newInstance(interface.toClass, implementation.toClass, createConfig) + val typedActor: AnyRef = TypedActor.newInstance(interface.toClass, implementation.toClass, createConfig) + if (isRemote && serverManaged) { + val server = RemoteServer.getOrCreateServer(new InetSocketAddress(host, port)) + if (serviceName.isEmpty) { + server.registerTypedActor(interface, typedActor) + } else { + server.registerTypedActor(serviceName, typedActor) + } + } + typedActor } /** @@ -111,7 +119,16 @@ class ActorFactoryBean extends AbstractFactoryBean[AnyRef] with Logging with App actorRef.makeTransactionRequired } if (isRemote) { - actorRef.makeRemote(host, port) + if (serverManaged) { + val server = RemoteServer.getOrCreateServer(new InetSocketAddress(host, port)) + if (serviceName.isEmpty) { + server.register(actorRef) + } else { + server.register(serviceName, actorRef) + } + } else { + actorRef.makeRemote(host, port) + } } if (hasDispatcher) { if (dispatcher.dispatcherType != THREAD_BASED){ @@ -159,7 +176,7 @@ class ActorFactoryBean extends AbstractFactoryBean[AnyRef] with Logging with App private[akka] def createConfig: TypedActorConfiguration = { val config = new TypedActorConfiguration().timeout(Duration(timeout, "millis")) if (transactional) config.makeTransactionRequired - if (isRemote) config.makeRemote(host, port) + if (isRemote && !serverManaged) config.makeRemote(host, port) if (hasDispatcher) { if (dispatcher.dispatcherType != THREAD_BASED) { config.dispatcher(dispatcherInstance()) @@ -191,3 +208,39 @@ class ActorFactoryBean extends AbstractFactoryBean[AnyRef] with Logging with App } } } + +/** + * Factory bean for remote client actor-for. + * + * @author michaelkober + */ +class ActorForFactoryBean extends AbstractFactoryBean[AnyRef] with Logging with ApplicationContextAware { + import StringReflect._ + import AkkaSpringConfigurationTags._ + + @BeanProperty var interface: String = "" + @BeanProperty var host: String = "" + @BeanProperty var port: Int = _ + @BeanProperty var serviceName: String = "" + //@BeanProperty var scope: String = VAL_SCOPE_SINGLETON + @BeanProperty var applicationContext: ApplicationContext = _ + + override def isSingleton = false + + /* + * @see org.springframework.beans.factory.FactoryBean#getObjectType() + */ + def getObjectType: Class[AnyRef] = classOf[AnyRef] + + /* + * @see org.springframework.beans.factory.config.AbstractFactoryBean#createInstance() + */ + def createInstance: AnyRef = { + if (interface.isEmpty) { + RemoteClient.actorFor(serviceName, host, port) + } else { + RemoteClient.typedActorFor(interface.toClass, serviceName, host, port) + } + } +} + diff --git a/akka-spring/src/main/scala/ActorParser.scala b/akka-spring/src/main/scala/ActorParser.scala index 69073bd52f..9858e9ad7e 100644 --- a/akka-spring/src/main/scala/ActorParser.scala +++ b/akka-spring/src/main/scala/ActorParser.scala @@ -6,6 +6,7 @@ package se.scalablesolutions.akka.spring import org.springframework.util.xml.DomUtils import org.w3c.dom.Element import scala.collection.JavaConversions._ +import se.scalablesolutions.akka.util.Logging import se.scalablesolutions.akka.actor.IllegalActorStateException @@ -27,11 +28,17 @@ trait ActorParser extends BeanParser with DispatcherParser { val objectProperties = new ActorProperties() val remoteElement = DomUtils.getChildElementByTagName(element, REMOTE_TAG); val dispatcherElement = DomUtils.getChildElementByTagName(element, DISPATCHER_TAG) - val propertyEntries = DomUtils.getChildElementsByTagName(element,PROPERTYENTRY_TAG) + val propertyEntries = DomUtils.getChildElementsByTagName(element, PROPERTYENTRY_TAG) if (remoteElement != null) { objectProperties.host = mandatory(remoteElement, HOST) objectProperties.port = mandatory(remoteElement, PORT).toInt + objectProperties.serverManaged = (remoteElement.getAttribute(MANAGED_BY) != null) && (remoteElement.getAttribute(MANAGED_BY).equals(SERVER_MANAGED)) + val serviceName = remoteElement.getAttribute(SERVICE_NAME) + if ((serviceName != null) && (!serviceName.isEmpty)) { + objectProperties.serviceName = serviceName + objectProperties.serverManaged = true + } } if (dispatcherElement != null) { @@ -43,7 +50,7 @@ trait ActorParser extends BeanParser with DispatcherParser { val entry = new PropertyEntry entry.name = element.getAttribute("name"); entry.value = element.getAttribute("value") - entry.ref = element.getAttribute("ref") + entry.ref = element.getAttribute("ref") objectProperties.propertyEntries.add(entry) } @@ -59,15 +66,13 @@ trait ActorParser extends BeanParser with DispatcherParser { objectProperties.target = mandatory(element, IMPLEMENTATION) objectProperties.transactional = if (element.getAttribute(TRANSACTIONAL).isEmpty) false else element.getAttribute(TRANSACTIONAL).toBoolean - if (!element.getAttribute(INTERFACE).isEmpty) { + if (element.hasAttribute(INTERFACE)) { objectProperties.interface = element.getAttribute(INTERFACE) } - - if (!element.getAttribute(LIFECYCLE).isEmpty) { + if (element.hasAttribute(LIFECYCLE)) { objectProperties.lifecycle = element.getAttribute(LIFECYCLE) } - - if (!element.getAttribute(SCOPE).isEmpty) { + if (element.hasAttribute(SCOPE)) { objectProperties.scope = element.getAttribute(SCOPE) } @@ -75,3 +80,158 @@ trait ActorParser extends BeanParser with DispatcherParser { } } + +/** + * Parser trait for custom namespace configuration for RemoteClient actor-for. + * @author michaelkober + */ +trait ActorForParser extends BeanParser { + import AkkaSpringConfigurationTags._ + + /** + * Parses the given element and returns a ActorForProperties. + * @param element dom element to parse + * @return configuration for the typed actor + */ + def parseActorFor(element: Element): ActorForProperties = { + val objectProperties = new ActorForProperties() + + objectProperties.host = mandatory(element, HOST) + objectProperties.port = mandatory(element, PORT).toInt + objectProperties.serviceName = mandatory(element, SERVICE_NAME) + if (element.hasAttribute(INTERFACE)) { + objectProperties.interface = element.getAttribute(INTERFACE) + } + objectProperties + } + +} + +/** + * Base trait with utility methods for bean parsing. + */ +trait BeanParser extends Logging { + + /** + * Get a mandatory element attribute. + * @param element the element with the mandatory attribute + * @param attribute name of the mandatory attribute + */ + def mandatory(element: Element, attribute: String): String = { + if ((element.getAttribute(attribute) == null) || (element.getAttribute(attribute).isEmpty)) { + throw new IllegalArgumentException("Mandatory attribute missing: " + attribute) + } else { + element.getAttribute(attribute) + } + } + + /** + * Get a mandatory child element. + * @param element the parent element + * @param childName name of the mandatory child element + */ + def mandatoryElement(element: Element, childName: String): Element = { + val childElement = DomUtils.getChildElementByTagName(element, childName); + if (childElement == null) { + throw new IllegalArgumentException("Mandatory element missing: ''") + } else { + childElement + } + } + +} + + +/** + * Parser trait for custom namespace for Akka dispatcher configuration. + * @author michaelkober + */ +trait DispatcherParser extends BeanParser { + import AkkaSpringConfigurationTags._ + + /** + * Parses the given element and returns a DispatcherProperties. + * @param element dom element to parse + * @return configuration for the dispatcher + */ + def parseDispatcher(element: Element): DispatcherProperties = { + val properties = new DispatcherProperties() + var dispatcherElement = element + if (hasRef(element)) { + val ref = element.getAttribute(REF) + dispatcherElement = element.getOwnerDocument.getElementById(ref) + if (dispatcherElement == null) { + throw new IllegalArgumentException("Referenced dispatcher not found: '" + ref + "'") + } + } + + properties.dispatcherType = mandatory(dispatcherElement, TYPE) + if (properties.dispatcherType == THREAD_BASED) { + val allowedParentNodes = "akka:typed-actor" :: "akka:untyped-actor" :: "typed-actor" :: "untyped-actor" :: Nil + if (!allowedParentNodes.contains(dispatcherElement.getParentNode.getNodeName)) { + throw new IllegalArgumentException("Thread based dispatcher must be nested in 'typed-actor' or 'untyped-actor' element!") + } + } + + if (properties.dispatcherType == HAWT) { // no name for HawtDispatcher + properties.name = dispatcherElement.getAttribute(NAME) + if (dispatcherElement.hasAttribute(AGGREGATE)) { + properties.aggregate = dispatcherElement.getAttribute(AGGREGATE).toBoolean + } + } else { + properties.name = mandatory(dispatcherElement, NAME) + } + + val threadPoolElement = DomUtils.getChildElementByTagName(dispatcherElement, THREAD_POOL_TAG); + if (threadPoolElement != null) { + if (properties.dispatcherType == THREAD_BASED) { + throw new IllegalArgumentException("Element 'thread-pool' not allowed for this dispatcher type.") + } + val threadPoolProperties = parseThreadPool(threadPoolElement) + properties.threadPool = threadPoolProperties + } + properties + } + + /** + * Parses the given element and returns a ThreadPoolProperties. + * @param element dom element to parse + * @return configuration for the thread pool + */ + def parseThreadPool(element: Element): ThreadPoolProperties = { + val properties = new ThreadPoolProperties() + properties.queue = element.getAttribute(QUEUE) + if (element.hasAttribute(CAPACITY)) { + properties.capacity = element.getAttribute(CAPACITY).toInt + } + if (element.hasAttribute(BOUND)) { + properties.bound = element.getAttribute(BOUND).toInt + } + if (element.hasAttribute(FAIRNESS)) { + properties.fairness = element.getAttribute(FAIRNESS).toBoolean + } + if (element.hasAttribute(CORE_POOL_SIZE)) { + properties.corePoolSize = element.getAttribute(CORE_POOL_SIZE).toInt + } + if (element.hasAttribute(MAX_POOL_SIZE)) { + properties.maxPoolSize = element.getAttribute(MAX_POOL_SIZE).toInt + } + if (element.hasAttribute(KEEP_ALIVE)) { + properties.keepAlive = element.getAttribute(KEEP_ALIVE).toLong + } + if (element.hasAttribute(REJECTION_POLICY)) { + properties.rejectionPolicy = element.getAttribute(REJECTION_POLICY) + } + if (element.hasAttribute(MAILBOX_CAPACITY)) { + properties.mailboxCapacity = element.getAttribute(MAILBOX_CAPACITY).toInt + } + properties + } + + def hasRef(element: Element): Boolean = { + val ref = element.getAttribute(REF) + (ref != null) && !ref.isEmpty + } + +} + diff --git a/akka-spring/src/main/scala/ActorProperties.scala b/akka-spring/src/main/scala/ActorProperties.scala index 15c7e61fe0..0f86942935 100644 --- a/akka-spring/src/main/scala/ActorProperties.scala +++ b/akka-spring/src/main/scala/ActorProperties.scala @@ -8,7 +8,7 @@ import org.springframework.beans.factory.support.BeanDefinitionBuilder import AkkaSpringConfigurationTags._ /** - * Data container for typed actor configuration data. + * Data container for actor configuration data. * @author michaelkober * @author Martin Krasser */ @@ -20,6 +20,8 @@ class ActorProperties { var transactional: Boolean = false var host: String = "" var port: Int = _ + var serverManaged: Boolean = false + var serviceName: String = "" var lifecycle: String = "" var scope:String = VAL_SCOPE_SINGLETON var dispatcher: DispatcherProperties = _ @@ -34,6 +36,8 @@ class ActorProperties { builder.addPropertyValue("typed", typed) builder.addPropertyValue(HOST, host) builder.addPropertyValue(PORT, port) + builder.addPropertyValue("serverManaged", serverManaged) + builder.addPropertyValue("serviceName", serviceName) builder.addPropertyValue(TIMEOUT, timeout) builder.addPropertyValue(IMPLEMENTATION, target) builder.addPropertyValue(INTERFACE, interface) @@ -45,3 +49,26 @@ class ActorProperties { } } + +/** + * Data container for actor configuration data. + * @author michaelkober + */ +class ActorForProperties { + var interface: String = "" + var host: String = "" + var port: Int = _ + var serviceName: String = "" + + /** + * Sets the properties to the given builder. + * @param builder bean definition builder + */ + def setAsProperties(builder: BeanDefinitionBuilder) { + builder.addPropertyValue(HOST, host) + builder.addPropertyValue(PORT, port) + builder.addPropertyValue("serviceName", serviceName) + builder.addPropertyValue(INTERFACE, interface) + } + +} diff --git a/akka-spring/src/main/scala/AkkaNamespaceHandler.scala b/akka-spring/src/main/scala/AkkaNamespaceHandler.scala index a478b7b262..b1c58baa20 100644 --- a/akka-spring/src/main/scala/AkkaNamespaceHandler.scala +++ b/akka-spring/src/main/scala/AkkaNamespaceHandler.scala @@ -12,10 +12,11 @@ import AkkaSpringConfigurationTags._ */ class AkkaNamespaceHandler extends NamespaceHandlerSupport { def init = { - registerBeanDefinitionParser(TYPED_ACTOR_TAG, new TypedActorBeanDefinitionParser()); - registerBeanDefinitionParser(UNTYPED_ACTOR_TAG, new UntypedActorBeanDefinitionParser()); - registerBeanDefinitionParser(SUPERVISION_TAG, new SupervisionBeanDefinitionParser()); - registerBeanDefinitionParser(DISPATCHER_TAG, new DispatcherBeanDefinitionParser()); - registerBeanDefinitionParser(CAMEL_SERVICE_TAG, new CamelServiceBeanDefinitionParser); + registerBeanDefinitionParser(TYPED_ACTOR_TAG, new TypedActorBeanDefinitionParser()) + registerBeanDefinitionParser(UNTYPED_ACTOR_TAG, new UntypedActorBeanDefinitionParser()) + registerBeanDefinitionParser(SUPERVISION_TAG, new SupervisionBeanDefinitionParser()) + registerBeanDefinitionParser(DISPATCHER_TAG, new DispatcherBeanDefinitionParser()) + registerBeanDefinitionParser(CAMEL_SERVICE_TAG, new CamelServiceBeanDefinitionParser) + registerBeanDefinitionParser(ACTOR_FOR_TAG, new ActorForBeanDefinitionParser()); } } diff --git a/akka-spring/src/main/scala/AkkaSpringConfigurationTags.scala b/akka-spring/src/main/scala/AkkaSpringConfigurationTags.scala index 4037f2c3ba..0e4de3576f 100644 --- a/akka-spring/src/main/scala/AkkaSpringConfigurationTags.scala +++ b/akka-spring/src/main/scala/AkkaSpringConfigurationTags.scala @@ -19,6 +19,7 @@ object AkkaSpringConfigurationTags { val DISPATCHER_TAG = "dispatcher" val PROPERTYENTRY_TAG = "property" val CAMEL_SERVICE_TAG = "camel-service" + val ACTOR_FOR_TAG = "actor-for" // actor sub tags val REMOTE_TAG = "remote" @@ -45,6 +46,8 @@ object AkkaSpringConfigurationTags { val TRANSACTIONAL = "transactional" val HOST = "host" val PORT = "port" + val MANAGED_BY = "managed-by" + val SERVICE_NAME = "service-name" val LIFECYCLE = "lifecycle" val SCOPE = "scope" @@ -101,4 +104,8 @@ object AkkaSpringConfigurationTags { val THREAD_BASED = "thread-based" val HAWT = "hawt" + // managed by types + val SERVER_MANAGED = "server" + val CLIENT_MANAGED = "client" + } diff --git a/akka-spring/src/main/scala/BeanParser.scala b/akka-spring/src/main/scala/BeanParser.scala deleted file mode 100644 index 1bbba9f09f..0000000000 --- a/akka-spring/src/main/scala/BeanParser.scala +++ /dev/null @@ -1,42 +0,0 @@ -/** - * Copyright (C) 2009-2010 Scalable Solutions AB - */ -package se.scalablesolutions.akka.spring - -import se.scalablesolutions.akka.util.Logging -import org.w3c.dom.Element -import org.springframework.util.xml.DomUtils - -/** - * Base trait with utility methods for bean parsing. - */ -trait BeanParser extends Logging { - - /** - * Get a mandatory element attribute. - * @param element the element with the mandatory attribute - * @param attribute name of the mandatory attribute - */ - def mandatory(element: Element, attribute: String): String = { - if ((element.getAttribute(attribute) == null) || (element.getAttribute(attribute).isEmpty)) { - throw new IllegalArgumentException("Mandatory attribute missing: " + attribute) - } else { - element.getAttribute(attribute) - } - } - - /** - * Get a mandatory child element. - * @param element the parent element - * @param childName name of the mandatory child element - */ - def mandatoryElement(element: Element, childName: String): Element = { - val childElement = DomUtils.getChildElementByTagName(element, childName); - if (childElement == null) { - throw new IllegalArgumentException("Mandatory element missing: ''") - } else { - childElement - } - } - -} diff --git a/akka-spring/src/main/scala/DispatcherParser.scala b/akka-spring/src/main/scala/DispatcherParser.scala deleted file mode 100644 index 4eaa4e05a7..0000000000 --- a/akka-spring/src/main/scala/DispatcherParser.scala +++ /dev/null @@ -1,100 +0,0 @@ -/** - * Copyright (C) 2009-2010 Scalable Solutions AB - */ -package se.scalablesolutions.akka.spring - -import org.w3c.dom.Element -import org.springframework.util.xml.DomUtils - -/** - * Parser trait for custom namespace for Akka dispatcher configuration. - * @author michaelkober - */ -trait DispatcherParser extends BeanParser { - import AkkaSpringConfigurationTags._ - - /** - * Parses the given element and returns a DispatcherProperties. - * @param element dom element to parse - * @return configuration for the dispatcher - */ - def parseDispatcher(element: Element): DispatcherProperties = { - val properties = new DispatcherProperties() - var dispatcherElement = element - if (hasRef(element)) { - val ref = element.getAttribute(REF) - dispatcherElement = element.getOwnerDocument.getElementById(ref) - if (dispatcherElement == null) { - throw new IllegalArgumentException("Referenced dispatcher not found: '" + ref + "'") - } - } - - properties.dispatcherType = mandatory(dispatcherElement, TYPE) - if (properties.dispatcherType == THREAD_BASED) { - val allowedParentNodes = "akka:typed-actor" :: "akka:untyped-actor" :: "typed-actor" :: "untyped-actor" :: Nil - if (!allowedParentNodes.contains(dispatcherElement.getParentNode.getNodeName)) { - throw new IllegalArgumentException("Thread based dispatcher must be nested in 'typed-actor' or 'untyped-actor' element!") - } - } - - if (properties.dispatcherType == HAWT) { // no name for HawtDispatcher - properties.name = dispatcherElement.getAttribute(NAME) - if (dispatcherElement.hasAttribute(AGGREGATE)) { - properties.aggregate = dispatcherElement.getAttribute(AGGREGATE).toBoolean - } - } else { - properties.name = mandatory(dispatcherElement, NAME) - } - - val threadPoolElement = DomUtils.getChildElementByTagName(dispatcherElement, THREAD_POOL_TAG); - if (threadPoolElement != null) { - if (properties.dispatcherType == THREAD_BASED) { - throw new IllegalArgumentException("Element 'thread-pool' not allowed for this dispatcher type.") - } - val threadPoolProperties = parseThreadPool(threadPoolElement) - properties.threadPool = threadPoolProperties - } - properties - } - - /** - * Parses the given element and returns a ThreadPoolProperties. - * @param element dom element to parse - * @return configuration for the thread pool - */ - def parseThreadPool(element: Element): ThreadPoolProperties = { - val properties = new ThreadPoolProperties() - properties.queue = element.getAttribute(QUEUE) - if (element.hasAttribute(CAPACITY)) { - properties.capacity = element.getAttribute(CAPACITY).toInt - } - if (element.hasAttribute(BOUND)) { - properties.bound = element.getAttribute(BOUND).toInt - } - if (element.hasAttribute(FAIRNESS)) { - properties.fairness = element.getAttribute(FAIRNESS).toBoolean - } - if (element.hasAttribute(CORE_POOL_SIZE)) { - properties.corePoolSize = element.getAttribute(CORE_POOL_SIZE).toInt - } - if (element.hasAttribute(MAX_POOL_SIZE)) { - properties.maxPoolSize = element.getAttribute(MAX_POOL_SIZE).toInt - } - if (element.hasAttribute(KEEP_ALIVE)) { - properties.keepAlive = element.getAttribute(KEEP_ALIVE).toLong - } - if (element.hasAttribute(REJECTION_POLICY)) { - properties.rejectionPolicy = element.getAttribute(REJECTION_POLICY) - } - if (element.hasAttribute(MAILBOX_CAPACITY)) { - properties.mailboxCapacity = element.getAttribute(MAILBOX_CAPACITY).toInt - } - properties - } - - def hasRef(element: Element): Boolean = { - val ref = element.getAttribute(REF) - (ref != null) && !ref.isEmpty - } - -} diff --git a/akka-spring/src/main/scala/PropertyEntries.scala b/akka-spring/src/main/scala/PropertyEntries.scala index bf1898a805..9a7dc098de 100644 --- a/akka-spring/src/main/scala/PropertyEntries.scala +++ b/akka-spring/src/main/scala/PropertyEntries.scala @@ -18,3 +18,19 @@ class PropertyEntries { entryList.append(entry) } } + +/** + * Represents a property element + * @author Johan Rask + */ +class PropertyEntry { + var name: String = _ + var value: String = null + var ref: String = null + + + override def toString(): String = { + format("name = %s,value = %s, ref = %s", name, value, ref) + } +} + diff --git a/akka-spring/src/main/scala/PropertyEntry.scala b/akka-spring/src/main/scala/PropertyEntry.scala deleted file mode 100644 index 9fe6357fc0..0000000000 --- a/akka-spring/src/main/scala/PropertyEntry.scala +++ /dev/null @@ -1,19 +0,0 @@ -/** - * Copyright (C) 2009-2010 Scalable Solutions AB - */ -package se.scalablesolutions.akka.spring - -/** - * Represents a property element - * @author Johan Rask - */ -class PropertyEntry { - var name: String = _ - var value: String = null - var ref: String = null - - - override def toString(): String = { - format("name = %s,value = %s, ref = %s", name, value, ref) - } -} diff --git a/akka-spring/src/main/scala/TypedActorBeanDefinitionParser.scala b/akka-spring/src/main/scala/TypedActorBeanDefinitionParser.scala deleted file mode 100644 index e8e0cef7d4..0000000000 --- a/akka-spring/src/main/scala/TypedActorBeanDefinitionParser.scala +++ /dev/null @@ -1,31 +0,0 @@ -/** - * Copyright (C) 2009-2010 Scalable Solutions AB - */ -package se.scalablesolutions.akka.spring - -import org.springframework.beans.factory.support.BeanDefinitionBuilder -import org.springframework.beans.factory.xml.AbstractSingleBeanDefinitionParser -import org.springframework.beans.factory.xml.ParserContext -import AkkaSpringConfigurationTags._ -import org.w3c.dom.Element - - -/** - * Parser for custom namespace configuration. - * @author michaelkober - */ -class TypedActorBeanDefinitionParser extends AbstractSingleBeanDefinitionParser with ActorParser { - /* - * @see org.springframework.beans.factory.xml.AbstractSingleBeanDefinitionParser#doParse(org.w3c.dom.Element, org.springframework.beans.factory.xml.ParserContext, org.springframework.beans.factory.support.BeanDefinitionBuilder) - */ - override def doParse(element: Element, parserContext: ParserContext, builder: BeanDefinitionBuilder) { - val typedActorConf = parseActor(element) - typedActorConf.typed = TYPED_ACTOR_TAG - typedActorConf.setAsProperties(builder) - } - - /* - * @see org.springframework.beans.factory.xml.AbstractSingleBeanDefinitionParser#getBeanClass(org.w3c.dom.Element) - */ - override def getBeanClass(element: Element): Class[_] = classOf[ActorFactoryBean] -} diff --git a/akka-spring/src/main/scala/UntypedActorBeanDefinitionParser.scala b/akka-spring/src/main/scala/UntypedActorBeanDefinitionParser.scala deleted file mode 100644 index 752e18559f..0000000000 --- a/akka-spring/src/main/scala/UntypedActorBeanDefinitionParser.scala +++ /dev/null @@ -1,31 +0,0 @@ -/** - * Copyright (C) 2009-2010 Scalable Solutions AB - */ -package se.scalablesolutions.akka.spring - -import org.springframework.beans.factory.support.BeanDefinitionBuilder -import org.springframework.beans.factory.xml.AbstractSingleBeanDefinitionParser -import org.springframework.beans.factory.xml.ParserContext -import AkkaSpringConfigurationTags._ -import org.w3c.dom.Element - - -/** - * Parser for custom namespace configuration. - * @author michaelkober - */ -class UntypedActorBeanDefinitionParser extends AbstractSingleBeanDefinitionParser with ActorParser { - /* - * @see org.springframework.beans.factory.xml.AbstractSingleBeanDefinitionParser#doParse(org.w3c.dom.Element, org.springframework.beans.factory.xml.ParserContext, org.springframework.beans.factory.support.BeanDefinitionBuilder) - */ - override def doParse(element: Element, parserContext: ParserContext, builder: BeanDefinitionBuilder) { - val untypedActorConf = parseActor(element) - untypedActorConf.typed = UNTYPED_ACTOR_TAG - untypedActorConf.setAsProperties(builder) - } - - /* - * @see org.springframework.beans.factory.xml.AbstractSingleBeanDefinitionParser#getBeanClass(org.w3c.dom.Element) - */ - override def getBeanClass(element: Element): Class[_] = classOf[ActorFactoryBean] -} diff --git a/akka-spring/src/test/java/se/scalablesolutions/akka/spring/foo/IMyPojo.java b/akka-spring/src/test/java/se/scalablesolutions/akka/spring/foo/IMyPojo.java index f2c5e24884..5a2a272e6c 100644 --- a/akka-spring/src/test/java/se/scalablesolutions/akka/spring/foo/IMyPojo.java +++ b/akka-spring/src/test/java/se/scalablesolutions/akka/spring/foo/IMyPojo.java @@ -8,14 +8,12 @@ package se.scalablesolutions.akka.spring.foo; * To change this template use File | Settings | File Templates. */ public interface IMyPojo { + public void oneWay(String message); + public String getFoo(); - public String getBar(); - - public void preRestart(); - - public void postRestart(); - public String longRunning(); + + } diff --git a/akka-spring/src/test/java/se/scalablesolutions/akka/spring/foo/MyPojo.java b/akka-spring/src/test/java/se/scalablesolutions/akka/spring/foo/MyPojo.java index fe3e9ba767..8f610eef63 100644 --- a/akka-spring/src/test/java/se/scalablesolutions/akka/spring/foo/MyPojo.java +++ b/akka-spring/src/test/java/se/scalablesolutions/akka/spring/foo/MyPojo.java @@ -1,42 +1,34 @@ package se.scalablesolutions.akka.spring.foo; -import se.scalablesolutions.akka.actor.*; +import se.scalablesolutions.akka.actor.TypedActor; -public class MyPojo extends TypedActor implements IMyPojo{ +import java.util.concurrent.CountDownLatch; - private String foo; - private String bar; +public class MyPojo extends TypedActor implements IMyPojo { + + public static CountDownLatch latch = new CountDownLatch(1); + public static String lastOneWayMessage = null; + private String foo = "foo"; - public MyPojo() { - this.foo = "foo"; - this.bar = "bar"; - } + public MyPojo() { + } + public String getFoo() { + return foo; + } - public String getFoo() { - return foo; - } + public void oneWay(String message) { + lastOneWayMessage = message; + latch.countDown(); + } - - public String getBar() { - return bar; - } - - public void preRestart() { - System.out.println("pre restart"); - } - - public void postRestart() { - System.out.println("post restart"); - } - - public String longRunning() { - try { - Thread.sleep(6000); - } catch (InterruptedException e) { - } - return "this took long"; + public String longRunning() { + try { + Thread.sleep(6000); + } catch (InterruptedException e) { } + return "this took long"; + } } diff --git a/akka-spring/src/test/java/se/scalablesolutions/akka/spring/foo/PingActor.java b/akka-spring/src/test/java/se/scalablesolutions/akka/spring/foo/PingActor.java index e447b26a28..3063a1b529 100644 --- a/akka-spring/src/test/java/se/scalablesolutions/akka/spring/foo/PingActor.java +++ b/akka-spring/src/test/java/se/scalablesolutions/akka/spring/foo/PingActor.java @@ -6,6 +6,8 @@ import se.scalablesolutions.akka.actor.ActorRef; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; +import java.util.concurrent.CountDownLatch; + /** * test class @@ -14,6 +16,9 @@ public class PingActor extends UntypedActor implements ApplicationContextAware { private String stringFromVal; private String stringFromRef; + public static String lastMessage = null; + public static CountDownLatch latch = new CountDownLatch(1); + private boolean gotApplicationContext = false; @@ -42,7 +47,6 @@ public class PingActor extends UntypedActor implements ApplicationContextAware { stringFromRef = s; } - private String longRunning() { try { Thread.sleep(6000); @@ -53,12 +57,12 @@ public class PingActor extends UntypedActor implements ApplicationContextAware { public void onReceive(Object message) throws Exception { if (message instanceof String) { - System.out.println("Ping received String message: " + message); + lastMessage = (String) message; if (message.equals("longRunning")) { - System.out.println("### starting pong"); ActorRef pongActor = UntypedActor.actorOf(PongActor.class).start(); pongActor.sendRequestReply("longRunning", getContext()); } + latch.countDown(); } else { throw new IllegalArgumentException("Unknown message: " + message); } diff --git a/akka-spring/src/test/resources/server-managed-config.xml b/akka-spring/src/test/resources/server-managed-config.xml new file mode 100644 index 0000000000..128b16c8b6 --- /dev/null +++ b/akka-spring/src/test/resources/server-managed-config.xml @@ -0,0 +1,57 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/akka-spring/src/test/resources/typed-actor-config.xml b/akka-spring/src/test/resources/typed-actor-config.xml index faca749469..989884e4fa 100644 --- a/akka-spring/src/test/resources/typed-actor-config.xml +++ b/akka-spring/src/test/resources/typed-actor-config.xml @@ -37,7 +37,7 @@ http://scalablesolutions.se/akka/akka-1.0-SNAPSHOT.xsd"> implementation="se.scalablesolutions.akka.spring.foo.MyPojo" timeout="2000" transactional="true"> - + - + + + + val props = parser.parseActor(dom(xml).getDocumentElement); + assert(props != null) + assert(props.host === "com.some.host") + assert(props.port === 9999) + assert(props.serviceName === "my-service") + assert(props.serverManaged) } } } diff --git a/akka-spring/src/test/scala/TypedActorSpringFeatureTest.scala b/akka-spring/src/test/scala/TypedActorSpringFeatureTest.scala index 8767b2e75a..3cdcd17cb0 100644 --- a/akka-spring/src/test/scala/TypedActorSpringFeatureTest.scala +++ b/akka-spring/src/test/scala/TypedActorSpringFeatureTest.scala @@ -4,10 +4,8 @@ package se.scalablesolutions.akka.spring -import foo.{IMyPojo, MyPojo} +import foo.{PingActor, IMyPojo, MyPojo} import se.scalablesolutions.akka.dispatch.FutureTimeoutException -import se.scalablesolutions.akka.remote.RemoteNode -import org.scalatest.FeatureSpec import org.scalatest.matchers.ShouldMatchers import org.scalatest.junit.JUnitRunner import org.junit.runner.RunWith @@ -16,13 +14,52 @@ import org.springframework.beans.factory.xml.XmlBeanDefinitionReader import org.springframework.context.ApplicationContext import org.springframework.context.support.ClassPathXmlApplicationContext import org.springframework.core.io.{ClassPathResource, Resource} +import org.scalatest.{BeforeAndAfterAll, FeatureSpec} +import se.scalablesolutions.akka.remote.{RemoteClient, RemoteServer, RemoteNode} +import java.util.concurrent.CountDownLatch +import se.scalablesolutions.akka.actor.{TypedActor, RemoteTypedActorOne, Actor} +import se.scalablesolutions.akka.actor.remote.RemoteTypedActorOneImpl /** * Tests for spring configuration of typed actors. * @author michaelkober */ @RunWith(classOf[JUnitRunner]) -class TypedActorSpringFeatureTest extends FeatureSpec with ShouldMatchers { +class TypedActorSpringFeatureTest extends FeatureSpec with ShouldMatchers with BeforeAndAfterAll { + + var server1: RemoteServer = null + var server2: RemoteServer = null + + override def beforeAll = { + val actor = Actor.actorOf[PingActor] // FIXME: remove this line when ticket 425 is fixed + server1 = new RemoteServer() + server1.start("localhost", 9990) + server2 = new RemoteServer() + server2.start("localhost", 9992) + + val typedActor = TypedActor.newInstance(classOf[RemoteTypedActorOne], classOf[RemoteTypedActorOneImpl], 1000) + server1.registerTypedActor("typed-actor-service", typedActor) + } + + // make sure the servers shutdown cleanly after the test has finished + override def afterAll = { + try { + server1.shutdown + server2.shutdown + RemoteClient.shutdownAll + Thread.sleep(1000) + } catch { + case e => () + } + } + + def getTypedActorFromContext(config: String, id: String) : IMyPojo = { + MyPojo.latch = new CountDownLatch(1) + val context = new ClassPathXmlApplicationContext(config) + val myPojo: IMyPojo = context.getBean(id).asInstanceOf[IMyPojo] + myPojo + } + feature("parse Spring application context") { scenario("akka:typed-actor and akka:supervision and akka:dispatcher can be used as top level elements") { @@ -37,41 +74,79 @@ class TypedActorSpringFeatureTest extends FeatureSpec with ShouldMatchers { } scenario("get a typed actor") { - val context = new ClassPathXmlApplicationContext("/typed-actor-config.xml") - val myPojo = context.getBean("simple-typed-actor").asInstanceOf[IMyPojo] - var msg = myPojo.getFoo() - msg += myPojo.getBar() - assert(msg === "foobar") + val myPojo = getTypedActorFromContext("/typed-actor-config.xml", "simple-typed-actor") + assert(myPojo.getFoo() === "foo") + myPojo.oneWay("hello 1") + MyPojo.latch.await + assert(MyPojo.lastOneWayMessage === "hello 1") } scenario("FutureTimeoutException when timed out") { - val context = new ClassPathXmlApplicationContext("/typed-actor-config.xml") - val myPojo = context.getBean("simple-typed-actor").asInstanceOf[IMyPojo] + val myPojo = getTypedActorFromContext("/typed-actor-config.xml", "simple-typed-actor") evaluating {myPojo.longRunning()} should produce[FutureTimeoutException] - } scenario("typed-actor with timeout") { - val context = new ClassPathXmlApplicationContext("/typed-actor-config.xml") - val myPojo = context.getBean("simple-typed-actor-long-timeout").asInstanceOf[IMyPojo] + val myPojo = getTypedActorFromContext("/typed-actor-config.xml", "simple-typed-actor-long-timeout") assert(myPojo.longRunning() === "this took long"); } scenario("transactional typed-actor") { - val context = new ClassPathXmlApplicationContext("/typed-actor-config.xml") - val myPojo = context.getBean("transactional-typed-actor").asInstanceOf[IMyPojo] - var msg = myPojo.getFoo() - msg += myPojo.getBar() - assert(msg === "foobar") + val myPojo = getTypedActorFromContext("/typed-actor-config.xml", "transactional-typed-actor") + assert(myPojo.getFoo() === "foo") + myPojo.oneWay("hello 2") + MyPojo.latch.await + assert(MyPojo.lastOneWayMessage === "hello 2") } scenario("get a remote typed-actor") { - RemoteNode.start - Thread.sleep(1000) - val context = new ClassPathXmlApplicationContext("/typed-actor-config.xml") - val myPojo = context.getBean("remote-typed-actor").asInstanceOf[IMyPojo] - assert(myPojo.getFoo === "foo") + val myPojo = getTypedActorFromContext("/typed-actor-config.xml", "remote-typed-actor") + assert(myPojo.getFoo() === "foo") + myPojo.oneWay("hello 3") + MyPojo.latch.await + assert(MyPojo.lastOneWayMessage === "hello 3") } + + scenario("get a client-managed-remote-typed-actor") { + val myPojo = getTypedActorFromContext("/server-managed-config.xml", "client-managed-remote-typed-actor") + assert(myPojo.getFoo() === "foo") + myPojo.oneWay("hello client-managed-remote-typed-actor") + MyPojo.latch.await + assert(MyPojo.lastOneWayMessage === "hello client-managed-remote-typed-actor") + } + + scenario("get a server-managed-remote-typed-actor") { + val serverPojo = getTypedActorFromContext("/server-managed-config.xml", "server-managed-remote-typed-actor") + // + val myPojoProxy = RemoteClient.typedActorFor(classOf[IMyPojo], classOf[IMyPojo].getName, 5000L, "localhost", 9990) + assert(myPojoProxy.getFoo() === "foo") + myPojoProxy.oneWay("hello server-managed-remote-typed-actor") + MyPojo.latch.await + assert(MyPojo.lastOneWayMessage === "hello server-managed-remote-typed-actor") + } + + scenario("get a server-managed-remote-typed-actor-custom-id") { + val serverPojo = getTypedActorFromContext("/server-managed-config.xml", "server-managed-remote-typed-actor-custom-id") + // + val myPojoProxy = RemoteClient.typedActorFor(classOf[IMyPojo], "mypojo-service", 5000L, "localhost", 9990) + assert(myPojoProxy.getFoo() === "foo") + myPojoProxy.oneWay("hello server-managed-remote-typed-actor 2") + MyPojo.latch.await + assert(MyPojo.lastOneWayMessage === "hello server-managed-remote-typed-actor 2") + } + + scenario("get a client proxy for server-managed-remote-typed-actor") { + MyPojo.latch = new CountDownLatch(1) + val context = new ClassPathXmlApplicationContext("/server-managed-config.xml") + val myPojo: IMyPojo = context.getBean("server-managed-remote-typed-actor-custom-id").asInstanceOf[IMyPojo] + // get client proxy from spring context + val myPojoProxy = context.getBean("typed-client-1").asInstanceOf[IMyPojo] + assert(myPojoProxy.getFoo() === "foo") + myPojoProxy.oneWay("hello") + MyPojo.latch.await + } + + } } diff --git a/akka-spring/src/test/scala/UntypedActorSpringFeatureTest.scala b/akka-spring/src/test/scala/UntypedActorSpringFeatureTest.scala index 11246cdc91..0397d30bf0 100644 --- a/akka-spring/src/test/scala/UntypedActorSpringFeatureTest.scala +++ b/akka-spring/src/test/scala/UntypedActorSpringFeatureTest.scala @@ -6,74 +6,146 @@ package se.scalablesolutions.akka.spring import foo.PingActor import se.scalablesolutions.akka.dispatch.ExecutorBasedEventDrivenWorkStealingDispatcher -import se.scalablesolutions.akka.remote.RemoteNode -import se.scalablesolutions.akka.actor.ActorRef -import org.scalatest.FeatureSpec import org.scalatest.matchers.ShouldMatchers import org.scalatest.junit.JUnitRunner import org.junit.runner.RunWith -import org.springframework.context.ApplicationContext import org.springframework.context.support.ClassPathXmlApplicationContext +import se.scalablesolutions.akka.remote.{RemoteClient, RemoteServer} +import org.scalatest.{BeforeAndAfterAll, FeatureSpec} +import java.util.concurrent.CountDownLatch +import se.scalablesolutions.akka.actor.{RemoteActorRef, ActorRegistry, Actor, ActorRef} /** * Tests for spring configuration of typed actors. * @author michaelkober */ @RunWith(classOf[JUnitRunner]) -class UntypedActorSpringFeatureTest extends FeatureSpec with ShouldMatchers { +class UntypedActorSpringFeatureTest extends FeatureSpec with ShouldMatchers with BeforeAndAfterAll { + + var server1: RemoteServer = null + var server2: RemoteServer = null + + + override def beforeAll = { + val actor = Actor.actorOf[PingActor] // FIXME: remove this line when ticket 425 is fixed + server1 = new RemoteServer() + server1.start("localhost", 9990) + server2 = new RemoteServer() + server2.start("localhost", 9992) + } + + // make sure the servers shutdown cleanly after the test has finished + override def afterAll = { + try { + server1.shutdown + server2.shutdown + RemoteClient.shutdownAll + Thread.sleep(1000) + } catch { + case e => () + } + } + + + def getPingActorFromContext(config: String, id: String) : ActorRef = { + PingActor.latch = new CountDownLatch(1) + val context = new ClassPathXmlApplicationContext(config) + val pingActor = context.getBean(id).asInstanceOf[ActorRef] + assert(pingActor.getActorClassName() === "se.scalablesolutions.akka.spring.foo.PingActor") + pingActor.start() + } + + feature("parse Spring application context") { - scenario("get an untyped actor") { - val context = new ClassPathXmlApplicationContext("/untyped-actor-config.xml") - val myactor = context.getBean("simple-untyped-actor").asInstanceOf[ActorRef] - assert(myactor.getActorClassName() === "se.scalablesolutions.akka.spring.foo.PingActor") - myactor.start() + scenario("get a untyped actor") { + val myactor = getPingActorFromContext("/untyped-actor-config.xml", "simple-untyped-actor") myactor.sendOneWay("Hello") + PingActor.latch.await + assert(PingActor.lastMessage === "Hello") assert(myactor.isDefinedAt("some string message")) } scenario("untyped-actor with timeout") { - val context = new ClassPathXmlApplicationContext("/untyped-actor-config.xml") - val myactor = context.getBean("simple-untyped-actor-long-timeout").asInstanceOf[ActorRef] - assert(myactor.getActorClassName() === "se.scalablesolutions.akka.spring.foo.PingActor") - myactor.start() - myactor.sendOneWay("Hello") + val myactor = getPingActorFromContext("/untyped-actor-config.xml", "simple-untyped-actor-long-timeout") assert(myactor.getTimeout() === 10000) + myactor.sendOneWay("Hello 2") + PingActor.latch.await + assert(PingActor.lastMessage === "Hello 2") } scenario("transactional untyped-actor") { - val context = new ClassPathXmlApplicationContext("/untyped-actor-config.xml") - val myactor = context.getBean("transactional-untyped-actor").asInstanceOf[ActorRef] - assert(myactor.getActorClassName() === "se.scalablesolutions.akka.spring.foo.PingActor") - myactor.start() - myactor.sendOneWay("Hello") - assert(myactor.isDefinedAt("some string message")) + val myactor = getPingActorFromContext("/untyped-actor-config.xml", "transactional-untyped-actor") + myactor.sendOneWay("Hello 3") + PingActor.latch.await + assert(PingActor.lastMessage === "Hello 3") } scenario("get a remote typed-actor") { - RemoteNode.start - Thread.sleep(1000) - val context = new ClassPathXmlApplicationContext("/untyped-actor-config.xml") - val myactor = context.getBean("remote-untyped-actor").asInstanceOf[ActorRef] - assert(myactor.getActorClassName() === "se.scalablesolutions.akka.spring.foo.PingActor") - myactor.start() - myactor.sendOneWay("Hello") - assert(myactor.isDefinedAt("some string message")) + val myactor = getPingActorFromContext("/untyped-actor-config.xml", "remote-untyped-actor") + myactor.sendOneWay("Hello 4") assert(myactor.getRemoteAddress().isDefined) assert(myactor.getRemoteAddress().get.getHostName() === "localhost") - assert(myactor.getRemoteAddress().get.getPort() === 9999) + assert(myactor.getRemoteAddress().get.getPort() === 9992) + PingActor.latch.await + assert(PingActor.lastMessage === "Hello 4") } scenario("untyped-actor with custom dispatcher") { - val context = new ClassPathXmlApplicationContext("/untyped-actor-config.xml") - val myactor = context.getBean("untyped-actor-with-dispatcher").asInstanceOf[ActorRef] - assert(myactor.getActorClassName() === "se.scalablesolutions.akka.spring.foo.PingActor") - myactor.start() - myactor.sendOneWay("Hello") + val myactor = getPingActorFromContext("/untyped-actor-config.xml", "untyped-actor-with-dispatcher") assert(myactor.getTimeout() === 1000) assert(myactor.getDispatcher.isInstanceOf[ExecutorBasedEventDrivenWorkStealingDispatcher]) + myactor.sendOneWay("Hello 5") + PingActor.latch.await + assert(PingActor.lastMessage === "Hello 5") } + + scenario("create client managed remote untyped-actor") { + val myactor = getPingActorFromContext("/server-managed-config.xml", "client-managed-remote-untyped-actor") + myactor.sendOneWay("Hello client managed remote untyped-actor") + PingActor.latch.await + assert(PingActor.lastMessage === "Hello client managed remote untyped-actor") + assert(myactor.getRemoteAddress().isDefined) + assert(myactor.getRemoteAddress().get.getHostName() === "localhost") + assert(myactor.getRemoteAddress().get.getPort() === 9990) + } + + scenario("create server managed remote untyped-actor") { + val myactor = getPingActorFromContext("/server-managed-config.xml", "server-managed-remote-untyped-actor") + val nrOfActors = ActorRegistry.actors.length + val actorRef = RemoteClient.actorFor("se.scalablesolutions.akka.spring.foo.PingActor", "localhost", 9990) + actorRef.sendOneWay("Hello server managed remote untyped-actor") + PingActor.latch.await + assert(PingActor.lastMessage === "Hello server managed remote untyped-actor") + assert(ActorRegistry.actors.length === nrOfActors) + } + + scenario("create server managed remote untyped-actor with custom service id") { + val myactor = getPingActorFromContext("/server-managed-config.xml", "server-managed-remote-untyped-actor-custom-id") + val nrOfActors = ActorRegistry.actors.length + val actorRef = RemoteClient.actorFor("ping-service", "localhost", 9990) + actorRef.sendOneWay("Hello server managed remote untyped-actor") + PingActor.latch.await + assert(PingActor.lastMessage === "Hello server managed remote untyped-actor") + assert(ActorRegistry.actors.length === nrOfActors) + } + + scenario("get client actor for server managed remote untyped-actor") { + PingActor.latch = new CountDownLatch(1) + val context = new ClassPathXmlApplicationContext("/server-managed-config.xml") + val pingActor = context.getBean("server-managed-remote-untyped-actor-custom-id").asInstanceOf[ActorRef] + assert(pingActor.getActorClassName() === "se.scalablesolutions.akka.spring.foo.PingActor") + pingActor.start() + val nrOfActors = ActorRegistry.actors.length + // get client actor ref from spring context + val actorRef = context.getBean("client-1").asInstanceOf[ActorRef] + assert(actorRef.isInstanceOf[RemoteActorRef]) + actorRef.sendOneWay("Hello") + PingActor.latch.await + assert(ActorRegistry.actors.length === nrOfActors) + } + } } diff --git a/akka-typed-actor/src/main/scala/actor/TypedActor.scala b/akka-typed-actor/src/main/scala/actor/TypedActor.scala index 52bd0e6cb6..e60b11c3d6 100644 --- a/akka-typed-actor/src/main/scala/actor/TypedActor.scala +++ b/akka-typed-actor/src/main/scala/actor/TypedActor.scala @@ -16,9 +16,8 @@ import org.codehaus.aspectwerkz.proxy.Proxy import org.codehaus.aspectwerkz.annotation.{Aspect, Around} import java.net.InetSocketAddress -import java.lang.reflect.{InvocationTargetException, Method, Field} - import scala.reflect.BeanProperty +import java.lang.reflect.{Method, Field, InvocationHandler, Proxy => JProxy} /** * TypedActor is a type-safe actor made out of a POJO with interface. @@ -390,7 +389,8 @@ object TypedActor extends Logging { typedActor.initialize(proxy) if (config._messageDispatcher.isDefined) actorRef.dispatcher = config._messageDispatcher.get if (config._threadBasedDispatcher.isDefined) actorRef.dispatcher = Dispatchers.newThreadBasedDispatcher(actorRef) - AspectInitRegistry.register(proxy, AspectInit(intfClass, typedActor, actorRef, None, config.timeout)) + if (config._host.isDefined) actorRef.makeRemote(config._host.get) + AspectInitRegistry.register(proxy, AspectInit(intfClass, typedActor, actorRef, config._host, config.timeout)) actorRef.start proxy.asInstanceOf[T] } @@ -408,24 +408,47 @@ object TypedActor extends Logging { proxy.asInstanceOf[T] } -/* - // NOTE: currently not used - but keep it around - private[akka] def newInstance[T <: TypedActor](targetClass: Class[T], - remoteAddress: Option[InetSocketAddress], timeout: Long): T = { - val proxy = { - val instance = Proxy.newInstance(targetClass, true, false) - if (instance.isInstanceOf[TypedActor]) instance.asInstanceOf[TypedActor] - else throw new IllegalActorStateException("Actor [" + targetClass.getName + "] is not a sub class of 'TypedActor'") + /** + * Create a proxy for a RemoteActorRef representing a server managed remote typed actor. + * + */ + private[akka] def createProxyForRemoteActorRef[T](intfClass: Class[T], actorRef: ActorRef): T = { + + class MyInvocationHandler extends InvocationHandler { + def invoke(proxy: AnyRef, method: Method, args: Array[AnyRef]): AnyRef = { + // do nothing, this is just a dummy + null + } } - val context = injectTypedActorContext(proxy) - actorRef.actor.asInstanceOf[Dispatcher].initialize(targetClass, proxy, proxy, context) - actorRef.timeout = timeout - if (remoteAddress.isDefined) actorRef.makeRemote(remoteAddress.get) - AspectInitRegistry.register(proxy, AspectInit(targetClass, proxy, actorRef, remoteAddress, timeout)) - actorRef.start - proxy.asInstanceOf[T] + val handler = new MyInvocationHandler() + + val interfaces = Array(intfClass, classOf[ServerManagedTypedActor]).asInstanceOf[Array[java.lang.Class[_]]] + val jProxy = JProxy.newProxyInstance(intfClass.getClassLoader(), interfaces, handler) + val awProxy = Proxy.newInstance(interfaces, Array(jProxy, jProxy), true, false) + + AspectInitRegistry.register(awProxy, AspectInit(intfClass, null, actorRef, None, 5000L)) + awProxy.asInstanceOf[T] } -*/ + + + /* + // NOTE: currently not used - but keep it around + private[akka] def newInstance[T <: TypedActor](targetClass: Class[T], + remoteAddress: Option[InetSocketAddress], timeout: Long): T = { + val proxy = { + val instance = Proxy.newInstance(targetClass, true, false) + if (instance.isInstanceOf[TypedActor]) instance.asInstanceOf[TypedActor] + else throw new IllegalActorStateException("Actor [" + targetClass.getName + "] is not a sub class of 'TypedActor'") + } + val context = injectTypedActorContext(proxy) + actorRef.actor.asInstanceOf[Dispatcher].initialize(targetClass, proxy, proxy, context) + actorRef.timeout = timeout + if (remoteAddress.isDefined) actorRef.makeRemote(remoteAddress.get) + AspectInitRegistry.register(proxy, AspectInit(targetClass, proxy, actorRef, remoteAddress, timeout)) + actorRef.start + proxy.asInstanceOf[T] + } + */ /** * Stops the current Typed Actor. @@ -546,6 +569,30 @@ object TypedActor extends Logging { private[akka] def isJoinPoint(message: Any): Boolean = message.isInstanceOf[JoinPoint] } + +/** + * AspectWerkz Aspect that is turning POJO into proxy to a server managed remote TypedActor. + *

+ * Is deployed on a 'perInstance' basis with the pointcut 'execution(* *.*(..))', + * e.g. all methods on the instance. + * + * @author Jonas Bonér + */ +@Aspect("perInstance") +private[akka] sealed class ServerManagedTypedActorAspect extends ActorAspect { + + @Around("execution(* *.*(..)) && this(se.scalablesolutions.akka.actor.ServerManagedTypedActor)") + def invoke(joinPoint: JoinPoint): AnyRef = { + if (!isInitialized) initialize(joinPoint) + remoteDispatch(joinPoint) + } + + override def initialize(joinPoint: JoinPoint): Unit = { + super.initialize(joinPoint) + remoteAddress = actorRef.remoteAddress + } +} + /** * AspectWerkz Aspect that is turning POJO into TypedActor. *

@@ -555,18 +602,9 @@ object TypedActor extends Logging { * @author Jonas Bonér */ @Aspect("perInstance") -private[akka] sealed class TypedActorAspect { - @volatile private var isInitialized = false - @volatile private var isStopped = false - private var interfaceClass: Class[_] = _ - private var typedActor: TypedActor = _ - private var actorRef: ActorRef = _ - private var remoteAddress: Option[InetSocketAddress] = _ - private var timeout: Long = _ - private var uuid: String = _ - @volatile private var instance: TypedActor = _ +private[akka] sealed class TypedActorAspect extends ActorAspect { - @Around("execution(* *.*(..))") + @Around("execution(* *.*(..)) && !this(se.scalablesolutions.akka.actor.ServerManagedTypedActor)") def invoke(joinPoint: JoinPoint): AnyRef = { if (!isInitialized) initialize(joinPoint) dispatch(joinPoint) @@ -576,12 +614,26 @@ private[akka] sealed class TypedActorAspect { if (remoteAddress.isDefined) remoteDispatch(joinPoint) else localDispatch(joinPoint) } +} - private def localDispatch(joinPoint: JoinPoint): AnyRef = { - val methodRtti = joinPoint.getRtti.asInstanceOf[MethodRtti] - val isOneWay = TypedActor.isOneWay(methodRtti) +/** + * Base class for TypedActorAspect and ServerManagedTypedActorAspect to reduce code duplication. + */ +private[akka] abstract class ActorAspect { + @volatile protected var isInitialized = false + @volatile protected var isStopped = false + protected var interfaceClass: Class[_] = _ + protected var typedActor: TypedActor = _ + protected var actorRef: ActorRef = _ + protected var timeout: Long = _ + protected var uuid: String = _ + protected var remoteAddress: Option[InetSocketAddress] = _ + + protected def localDispatch(joinPoint: JoinPoint): AnyRef = { + val methodRtti = joinPoint.getRtti.asInstanceOf[MethodRtti] + val isOneWay = TypedActor.isOneWay(methodRtti) val senderActorRef = Some(SenderContextInfo.senderActorRef.value) - val senderProxy = Some(SenderContextInfo.senderProxy.value) + val senderProxy = Some(SenderContextInfo.senderProxy.value) typedActor.context._sender = senderProxy if (!actorRef.isRunning && !isStopped) { @@ -602,7 +654,7 @@ private[akka] sealed class TypedActorAspect { } } - private def remoteDispatch(joinPoint: JoinPoint): AnyRef = { + protected def remoteDispatch(joinPoint: JoinPoint): AnyRef = { val methodRtti = joinPoint.getRtti.asInstanceOf[MethodRtti] val isOneWay = TypedActor.isOneWay(methodRtti) @@ -641,7 +693,7 @@ private[akka] sealed class TypedActorAspect { (escapedArgs, isEscaped) } - private def initialize(joinPoint: JoinPoint): Unit = { + protected def initialize(joinPoint: JoinPoint): Unit = { val init = AspectInitRegistry.initFor(joinPoint.getThis) interfaceClass = init.interfaceClass typedActor = init.targetInstance @@ -653,6 +705,7 @@ private[akka] sealed class TypedActorAspect { } } + /** * Internal helper class to help pass the contextual information between threads. * @@ -704,5 +757,11 @@ private[akka] sealed case class AspectInit( val timeout: Long) { def this(interfaceClass: Class[_], targetInstance: TypedActor, actorRef: ActorRef, timeout: Long) = this(interfaceClass, targetInstance, actorRef, None, timeout) + } + +/** + * Marker interface for server manager typed actors. + */ +private[akka] sealed trait ServerManagedTypedActor extends TypedActor \ No newline at end of file diff --git a/akka-typed-actor/src/main/scala/config/TypedActorGuiceConfigurator.scala b/akka-typed-actor/src/main/scala/config/TypedActorGuiceConfigurator.scala index 339c4d297d..5ca249a3ec 100644 --- a/akka-typed-actor/src/main/scala/config/TypedActorGuiceConfigurator.scala +++ b/akka-typed-actor/src/main/scala/config/TypedActorGuiceConfigurator.scala @@ -122,7 +122,6 @@ private[akka] class TypedActorGuiceConfigurator extends TypedActorConfiguratorBa remoteAddress.foreach { address => actorRef.makeRemote(remoteAddress.get) - RemoteServerModule.registerTypedActor(address, implementationClass.getName, proxy) } AspectInitRegistry.register( diff --git a/akka-typed-actor/src/test/resources/META-INF/aop.xml b/akka-typed-actor/src/test/resources/META-INF/aop.xml index bdc167ca54..be133a51b8 100644 --- a/akka-typed-actor/src/test/resources/META-INF/aop.xml +++ b/akka-typed-actor/src/test/resources/META-INF/aop.xml @@ -2,6 +2,7 @@ +