From 60d4010421d73d241f809974ab6d3423facf61f1 Mon Sep 17 00:00:00 2001 From: Michael Kober Date: Mon, 6 Sep 2010 10:15:44 +0200 Subject: [PATCH 1/8] started working on ticket 194 --- .../src/main/scala/actor/ActorRef.scala | 7 +- .../src/main/scala/remote/RemoteClient.scala | 16 +- .../src/main/scala/remote/RemoteServer.scala | 44 ++++- .../akka/actor/RemoteTypedActorOneImpl.java | 3 + .../src/test/resources/META-INF/aop.xml | 1 + .../ServerInitiatedRemoteTypedActorSpec.scala | 86 +++++++++ .../src/main/scala/actor/TypedActor.scala | 176 +++++++++++++++--- .../typed-actor/TypedActorLifecycleSpec.scala | 1 + 8 files changed, 299 insertions(+), 35 deletions(-) create mode 100644 akka-remote/src/test/scala/remote/ServerInitiatedRemoteTypedActorSpec.scala diff --git a/akka-actor/src/main/scala/actor/ActorRef.scala b/akka-actor/src/main/scala/actor/ActorRef.scala index a6b42db579..c712a8d408 100644 --- a/akka-actor/src/main/scala/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/actor/ActorRef.scala @@ -1362,7 +1362,8 @@ private[akka] case class RemoteActorRef private[akka] ( val hostname: String, val port: Int, _timeout: Long, - loader: Option[ClassLoader]) + loader: Option[ClassLoader], + val actorType: ActorType = ActorType.ScalaActor ) extends ActorRef with ScalaActorRef { ensureRemotingEnabled @@ -1375,7 +1376,7 @@ private[akka] case class RemoteActorRef private[akka] ( def postMessageToMailbox(message: Any, senderOption: Option[ActorRef]): Unit = RemoteClientModule.send[Any]( - message, senderOption, None, remoteAddress.get, timeout, true, this, None, ActorType.ScalaActor) + message, senderOption, None, remoteAddress.get, timeout, true, this, None, actorType) def postMessageToMailboxAndCreateFutureResultWithTimeout[T]( message: Any, @@ -1383,7 +1384,7 @@ private[akka] case class RemoteActorRef private[akka] ( senderOption: Option[ActorRef], senderFuture: Option[CompletableFuture[T]]): CompletableFuture[T] = { val future = RemoteClientModule.send[T]( - message, senderOption, senderFuture, remoteAddress.get, timeout, false, this, None, ActorType.ScalaActor) + message, senderOption, senderFuture, remoteAddress.get, timeout, false, this, None, actorType) if (future.isDefined) future.get else throw new IllegalActorStateException("Expected a future from remote call to actor " + toString) } diff --git a/akka-remote/src/main/scala/remote/RemoteClient.scala b/akka-remote/src/main/scala/remote/RemoteClient.scala index f61a5d63a1..689f55a049 100644 --- a/akka-remote/src/main/scala/remote/RemoteClient.scala +++ b/akka-remote/src/main/scala/remote/RemoteClient.scala @@ -30,6 +30,7 @@ import java.util.concurrent.atomic.AtomicLong import scala.collection.mutable.{HashSet, HashMap} import scala.reflect.BeanProperty +import se.scalablesolutions.akka.actor._ /** * Atomic remote request/reply message id generator. @@ -76,11 +77,19 @@ object RemoteClient extends Logging { private val remoteClients = new HashMap[String, RemoteClient] private val remoteActors = new HashMap[RemoteServer.Address, HashSet[String]] - // FIXME: simplify overloaded methods when we have Scala 2.8 - def actorFor(classNameOrServiceId: String, hostname: String, port: Int): ActorRef = actorFor(classNameOrServiceId, classNameOrServiceId, 5000L, hostname, port, None) + // FIXME: + def typedActorFor[T](intfClass: Class[T], serviceId: String, implClassName: String, timeout: Long, hostname: String, port: Int) : T = { + + println("### create RemoteActorRef") + val actorRef = RemoteActorRef(serviceId, implClassName, hostname, port, timeout, None, ActorType.TypedActor) + val proxy = TypedActor.createProxyForRemoteActorRef(intfClass, actorRef) + proxy + + } + def actorFor(classNameOrServiceId: String, hostname: String, port: Int, loader: ClassLoader): ActorRef = actorFor(classNameOrServiceId, classNameOrServiceId, 5000L, hostname, port, Some(loader)) @@ -99,6 +108,9 @@ object RemoteClient extends Logging { def actorFor(serviceId: String, className: String, timeout: Long, hostname: String, port: Int): ActorRef = RemoteActorRef(serviceId, className, hostname, port, timeout, None) + + + private[akka] def actorFor(serviceId: String, className: String, timeout: Long, hostname: String, port: Int, loader: ClassLoader): ActorRef = RemoteActorRef(serviceId, className, hostname, port, timeout, Some(loader)) diff --git a/akka-remote/src/main/scala/remote/RemoteServer.scala b/akka-remote/src/main/scala/remote/RemoteServer.scala index 5f24def4f5..af1507c3cb 100644 --- a/akka-remote/src/main/scala/remote/RemoteServer.scala +++ b/akka-remote/src/main/scala/remote/RemoteServer.scala @@ -120,9 +120,12 @@ object RemoteServer { } } - private class RemoteActorSet { - private[RemoteServer] val actors = new ConcurrentHashMap[String, ActorRef] - private[RemoteServer] val typedActors = new ConcurrentHashMap[String, AnyRef] + // FIXME private + class RemoteActorSet { + //private[RemoteServer] val actors = new ConcurrentHashMap[String, ActorRef] + val actors = new ConcurrentHashMap[String, ActorRef] + //private[RemoteServer] val typedActors = new ConcurrentHashMap[String, AnyRef] + val typedActors = new ConcurrentHashMap[String, AnyRef] } private val guard = new ReadWriteGuard @@ -130,11 +133,14 @@ object RemoteServer { private val remoteServers = Map[Address, RemoteServer]() private[akka] def registerActor(address: InetSocketAddress, uuid: String, actor: ActorRef) = guard.withWriteGuard { - actorsFor(RemoteServer.Address(address.getHostName, address.getPort)).actors.put(uuid, actor) + // FIXME + //actorsFor(RemoteServer.Address(address.getHostName, address.getPort)).actors.put(uuid, actor) + val actors = actorsFor(RemoteServer.Address(address.getHostName, address.getPort)).actors + actors.put(uuid, actor) } - private[akka] def registerTypedActor(address: InetSocketAddress, name: String, typedActor: AnyRef) = guard.withWriteGuard { - actorsFor(RemoteServer.Address(address.getHostName, address.getPort)).typedActors.put(name, typedActor) + private[akka] def registerTypedActor(address: InetSocketAddress, uuid: String, typedActor: TypedActor) = guard.withWriteGuard { + actorsFor(RemoteServer.Address(address.getHostName, address.getPort)).typedActors.put(uuid, typedActor) } private[akka] def getOrCreateServer(address: InetSocketAddress): RemoteServer = guard.withWriteGuard { @@ -159,7 +165,9 @@ object RemoteServer { remoteServers.remove(Address(hostname, port)) } - private def actorsFor(remoteServerAddress: RemoteServer.Address): RemoteActorSet = { + // FIXME + def actorsFor(remoteServerAddress: RemoteServer.Address): RemoteActorSet = { + println("##### actorsFor SIZE=" + remoteActorSets.size) remoteActorSets.getOrElseUpdate(remoteServerAddress,new RemoteActorSet) } } @@ -195,7 +203,7 @@ class RemoteServer extends Logging with ListenerManagement { private[akka] var hostname = RemoteServer.HOSTNAME private[akka] var port = RemoteServer.PORT - + @volatile private var _isRunning = false private val factory = new NioServerSocketChannelFactory( @@ -270,7 +278,22 @@ class RemoteServer extends Logging with ListenerManagement { } } - // TODO: register typed actor in RemoteServer as well + // FIXME: register typed actor in RemoteServer as well + def registerTypedActor(id: String, actorRef: AnyRef): Unit = synchronized { + val typedActors = RemoteServer.actorsFor(RemoteServer.Address(hostname, port)).typedActors + if (!typedActors.contains(id)) { + log.debug("Registering server side remote actor [%s] with id [%s] on [%s:%d]", actorRef.getClass.getName, id, hostname, port) + typedActors.put(id, actorRef) + } + } + + private def actors() : ConcurrentHashMap[String, ActorRef] = { + RemoteServer.actorsFor(RemoteServer.Address(hostname, port)).actors + } + + private def typedActors() : ConcurrentHashMap[String, AnyRef] = { + RemoteServer.actorsFor(RemoteServer.Address(hostname, port)).typedActors + } /** * Register Remote Actor by the Actor's 'id' field. It starts the Actor if it is not started already. @@ -285,6 +308,7 @@ class RemoteServer extends Logging with ListenerManagement { def register(id: String, actorRef: ActorRef): Unit = synchronized { if (_isRunning) { val actors = RemoteServer.actorsFor(RemoteServer.Address(hostname, port)).actors + println("___ register ___ " + actors.hashCode + " hostname=" + hostname + " port="+ port) if (!actors.contains(id)) { if (!actorRef.isRunning) actorRef.start log.debug("Registering server side remote actor [%s] with id [%s]", actorRef.actorClass.getName, id) @@ -320,6 +344,8 @@ class RemoteServer extends Logging with ListenerManagement { } } + //FIXME: unregister typed Actor + protected override def manageLifeCycleOfListeners = false protected[akka] override def foreachListener(f: (ActorRef) => Unit): Unit = super.foreachListener(f) diff --git a/akka-remote/src/test/java/se/scalablesolutions/akka/actor/RemoteTypedActorOneImpl.java b/akka-remote/src/test/java/se/scalablesolutions/akka/actor/RemoteTypedActorOneImpl.java index 715e5366a4..a30aa26124 100644 --- a/akka-remote/src/test/java/se/scalablesolutions/akka/actor/RemoteTypedActorOneImpl.java +++ b/akka-remote/src/test/java/se/scalablesolutions/akka/actor/RemoteTypedActorOneImpl.java @@ -18,9 +18,12 @@ public class RemoteTypedActorOneImpl extends TypedActor implements RemoteTypedAc } public void oneWay() throws Exception { + System.out.println("--------> got it!!!!!!"); RemoteTypedActorLog.oneWayLog().put("oneway"); } + + @Override public void preRestart(Throwable e) { try { RemoteTypedActorLog.messageLog().put(e.getMessage()); } catch(Exception ex) {} diff --git a/akka-remote/src/test/resources/META-INF/aop.xml b/akka-remote/src/test/resources/META-INF/aop.xml index bdc167ca54..be133a51b8 100644 --- a/akka-remote/src/test/resources/META-INF/aop.xml +++ b/akka-remote/src/test/resources/META-INF/aop.xml @@ -2,6 +2,7 @@ + diff --git a/akka-remote/src/test/scala/remote/ServerInitiatedRemoteTypedActorSpec.scala b/akka-remote/src/test/scala/remote/ServerInitiatedRemoteTypedActorSpec.scala new file mode 100644 index 0000000000..76c50f7658 --- /dev/null +++ b/akka-remote/src/test/scala/remote/ServerInitiatedRemoteTypedActorSpec.scala @@ -0,0 +1,86 @@ +/** + * Copyright (C) 2009-2010 Scalable Solutions AB + */ + +package se.scalablesolutions.akka.actor.remote + +import java.util.concurrent.{CountDownLatch, TimeUnit} +import org.scalatest.junit.JUnitSuite +import org.junit.{Test, Before, After} + +import se.scalablesolutions.akka.remote.{RemoteServer, RemoteClient} +import se.scalablesolutions.akka.actor._ + +object ServerInitiatedRemoteTypedActorSpec { + val HOSTNAME = "localhost" + val PORT = 9990 + var server: RemoteServer = null + + class SimpleActor extends Actor { + def receive = { + case _ => println("received message") + } + } + + +} + +class ServerInitiatedRemoteTypedActorSpec extends JUnitSuite { + import ServerInitiatedRemoteTypedActorSpec._ + private val unit = TimeUnit.MILLISECONDS + + + @Before + def init { + server = new RemoteServer() + server.start(HOSTNAME, PORT) + Thread.sleep(1000) + } + + // make sure the servers shutdown cleanly after the test has finished + @After + def finished { + try { + server.shutdown + RemoteClient.shutdownAll + Thread.sleep(1000) + } catch { + case e => () + } + } + + + @Test + def shouldSendWithBang { + + /* + val clientManangedTypedActor = TypedActor.newRemoteInstance(classOf[RemoteTypedActorOne], classOf[RemoteTypedActorOneImpl], 1000, HOSTNAME, PORT) + clientManangedTypedActor.requestReply("test-string") + Thread.sleep(2000) + println("###########") + */ + /* + trace() + val actor = Actor.actorOf[SimpleActor].start + server.register("simple-actor", actor) + val typedActor = TypedActor.newInstance(classOf[RemoteTypedActorOne], classOf[RemoteTypedActorOneImpl], 1000) + server.registerTypedActor("typed-actor-service", typedActor) + println("### registered actor") + trace() + + //val actorRef = RemoteActorRef("typed-actor-service", classOf[RemoteTypedActorOneImpl].getName, HOSTNAME, PORT, 5000L, None) + //val myActor = TypedActor.createProxyForRemoteActorRef(classOf[RemoteTypedActorOne], actorRef) + val myActor = RemoteClient.typedActorFor(classOf[RemoteTypedActorOne], "typed-actor-service", classOf[RemoteTypedActorOneImpl].getName, 5000L, HOSTNAME, PORT) + println("### call one way") + myActor.oneWay() + Thread.sleep(3000) + println("### call one way - done") + */ + //assert(RemoteActorSpecActorUnidirectional.latch.await(1, TimeUnit.SECONDS)) + //actor.stop + /* */ + } + + +} + diff --git a/akka-typed-actor/src/main/scala/actor/TypedActor.scala b/akka-typed-actor/src/main/scala/actor/TypedActor.scala index b27f5b4b4d..7f0ee5dbbc 100644 --- a/akka-typed-actor/src/main/scala/actor/TypedActor.scala +++ b/akka-typed-actor/src/main/scala/actor/TypedActor.scala @@ -16,9 +16,8 @@ import org.codehaus.aspectwerkz.proxy.Proxy import org.codehaus.aspectwerkz.annotation.{Aspect, Around} import java.net.InetSocketAddress -import java.lang.reflect.{InvocationTargetException, Method, Field} - import scala.reflect.BeanProperty +import java.lang.reflect.{Method, Field, InvocationHandler, Proxy => JProxy} /** * TypedActor is a type-safe actor made out of a POJO with interface. @@ -183,6 +182,7 @@ abstract class TypedActor extends Actor with Proxyable { case Link(proxy) => self.link(proxy) case Unlink(proxy) => self.unlink(proxy) + case method: String => println("### got method") case unexpected => throw new IllegalActorStateException( "Unexpected message [" + unexpected + "] sent to [" + this + "]") } @@ -408,24 +408,47 @@ object TypedActor extends Logging { proxy.asInstanceOf[T] } -/* - // NOTE: currently not used - but keep it around - private[akka] def newInstance[T <: TypedActor](targetClass: Class[T], - remoteAddress: Option[InetSocketAddress], timeout: Long): T = { - val proxy = { - val instance = Proxy.newInstance(targetClass, true, false) - if (instance.isInstanceOf[TypedActor]) instance.asInstanceOf[TypedActor] - else throw new IllegalActorStateException("Actor [" + targetClass.getName + "] is not a sub class of 'TypedActor'") + // FIXME + def createProxyForRemoteActorRef[T](intfClass: Class[T], actorRef: ActorRef): T = { + + class MyInvocationHandler extends InvocationHandler { + def invoke(proxy: AnyRef, method: Method, args: Array[AnyRef]): AnyRef = { + // do nothing, this is just a dummy + null + } } - val context = injectTypedActorContext(proxy) - actorRef.actor.asInstanceOf[Dispatcher].initialize(targetClass, proxy, proxy, context) - actorRef.timeout = timeout - if (remoteAddress.isDefined) actorRef.makeRemote(remoteAddress.get) - AspectInitRegistry.register(proxy, AspectInit(targetClass, proxy, actorRef, remoteAddress, timeout)) - actorRef.start - proxy.asInstanceOf[T] + val handler = new MyInvocationHandler() + + val interfaces = Array(intfClass, classOf[ServerManagedTypedActor]).asInstanceOf[Array[java.lang.Class[_]]] + val jProxy = JProxy.newProxyInstance(intfClass.getClassLoader(), interfaces, handler) + val awProxy = Proxy.newInstance(interfaces, Array(jProxy, jProxy), true, false) + + // TODO: needed? + //typedActor.initialize(proxy) + // TODO: timeout?? + AspectInitRegistry.register(awProxy, AspectInit(intfClass, null, actorRef, None, 5000L)) + awProxy.asInstanceOf[T] } -*/ + + + /* + // NOTE: currently not used - but keep it around + private[akka] def newInstance[T <: TypedActor](targetClass: Class[T], + remoteAddress: Option[InetSocketAddress], timeout: Long): T = { + val proxy = { + val instance = Proxy.newInstance(targetClass, true, false) + if (instance.isInstanceOf[TypedActor]) instance.asInstanceOf[TypedActor] + else throw new IllegalActorStateException("Actor [" + targetClass.getName + "] is not a sub class of 'TypedActor'") + } + val context = injectTypedActorContext(proxy) + actorRef.actor.asInstanceOf[Dispatcher].initialize(targetClass, proxy, proxy, context) + actorRef.timeout = timeout + if (remoteAddress.isDefined) actorRef.makeRemote(remoteAddress.get) + AspectInitRegistry.register(proxy, AspectInit(targetClass, proxy, actorRef, remoteAddress, timeout)) + actorRef.start + proxy.asInstanceOf[T] + } + */ /** * Stops the current Typed Actor. @@ -546,6 +569,114 @@ object TypedActor extends Logging { private[akka] def isJoinPoint(message: Any): Boolean = message.isInstanceOf[JoinPoint] } +/** + * AspectWerkz Aspect that is turning POJO into proxy to a server managed remote TypedActor. + *

+ * Is deployed on a 'perInstance' basis with the pointcut 'execution(* *.*(..))', + * e.g. all methods on the instance. + * + * @author Jonas Bonér + */ +@Aspect("perInstance") +private[akka] sealed class ServerManagedTypedActorAspect { + @volatile private var isInitialized = false + @volatile private var isStopped = false + private var interfaceClass: Class[_] = _ + private var actorRef: ActorRef = _ + private var timeout: Long = _ + private var uuid: String = _ + private var remoteAddress: Option[InetSocketAddress] = _ + + //FIXME + + @Around("execution(* *.*(..)) && this(se.scalablesolutions.akka.actor.ServerManagedTypedActor)") + def invoke(joinPoint: JoinPoint): AnyRef = { + println("### MyAspect intercepted " + joinPoint.getSignature) + if (!isInitialized) initialize(joinPoint) + remoteDispatch(joinPoint) + } + + + private def remoteDispatch(joinPoint: JoinPoint): AnyRef = { + val methodRtti = joinPoint.getRtti.asInstanceOf[MethodRtti] + val isOneWay = TypedActor.isOneWay(methodRtti) + + val (message: Array[AnyRef], isEscaped) = escapeArguments(methodRtti.getParameterValues) + + println("### remote dispatch...") + + val future = RemoteClientModule.send[AnyRef]( + message, None, None, remoteAddress.get, + timeout, isOneWay, actorRef, + Some((interfaceClass.getName, methodRtti.getMethod.getName)), + ActorType.TypedActor) + + if (isOneWay) null // for void methods + else { + if (future.isDefined) { + future.get.await + val result = getResultOrThrowException(future.get) + if (result.isDefined) result.get + else throw new IllegalActorStateException("No result returned from call to [" + joinPoint + "]") + } else throw new IllegalActorStateException("No future returned from call to [" + joinPoint + "]") + } + } + + /* + private def remoteDispatch(joinPoint: JoinPoint): AnyRef = { + val methodRtti = joinPoint.getRtti.asInstanceOf[MethodRtti] + val isOneWay = TypedActor.isOneWay(methodRtti) + val senderActorRef = Some(SenderContextInfo.senderActorRef.value) + + + if (!actorRef.isRunning && !isStopped) { + isStopped = true + // FIXME - what to do? + // joinPoint.proceed + null + } else if (isOneWay) { + actorRef.!("joinPoint") + //actorRef.!(joinPoint)(senderActorRef) + null.asInstanceOf[AnyRef] + } else if (TypedActor.returnsFuture_?(methodRtti)) { + actorRef.!!!(joinPoint, timeout)(senderActorRef) + } else { + val result = (actorRef.!!(joinPoint, timeout)(senderActorRef)).as[AnyRef] + if (result.isDefined) result.get + else throw new ActorTimeoutException("Invocation to [" + joinPoint + "] timed out.") + } + } + */ + private def getResultOrThrowException[T](future: Future[T]): Option[T] = + if (future.exception.isDefined) throw future.exception.get + else future.result + + private def escapeArguments(args: Array[AnyRef]): Tuple2[Array[AnyRef], Boolean] = { + var isEscaped = false + val escapedArgs = for (arg <- args) yield { + val clazz = arg.getClass + if (clazz.getName.contains(TypedActor.AW_PROXY_PREFIX)) { + isEscaped = true + TypedActor.AW_PROXY_PREFIX + clazz.getSuperclass.getName + } else arg + } + (escapedArgs, isEscaped) + } + + + private def initialize(joinPoint: JoinPoint): Unit = { + val init = AspectInitRegistry.initFor(joinPoint.getThis) + interfaceClass = init.interfaceClass + actorRef = init.actorRef + uuid = actorRef.uuid + remoteAddress = actorRef.remoteAddress + println("### address= " + remoteAddress.get) + timeout = init.timeout + isInitialized = true + } + +} + /** * AspectWerkz Aspect that is turning POJO into TypedActor. *

@@ -564,9 +695,8 @@ private[akka] sealed class TypedActorAspect { private var remoteAddress: Option[InetSocketAddress] = _ private var timeout: Long = _ private var uuid: String = _ - @volatile private var instance: TypedActor = _ - - @Around("execution(* *.*(..))") + + @Around("execution(* *.*(..)) && !this(se.scalablesolutions.akka.actor.ServerManagedTypedActor)") def invoke(joinPoint: JoinPoint): AnyRef = { if (!isInitialized) initialize(joinPoint) dispatch(joinPoint) @@ -653,6 +783,7 @@ private[akka] sealed class TypedActorAspect { } } + /** * Internal helper class to help pass the contextual information between threads. * @@ -704,5 +835,8 @@ private[akka] sealed case class AspectInit( val timeout: Long) { def this(interfaceClass: Class[_], targetInstance: TypedActor, actorRef: ActorRef, timeout: Long) = this(interfaceClass, targetInstance, actorRef, None, timeout) + } + +private[akka] sealed trait ServerManagedTypedActor extends TypedActor \ No newline at end of file diff --git a/akka-typed-actor/src/test/scala/actor/typed-actor/TypedActorLifecycleSpec.scala b/akka-typed-actor/src/test/scala/actor/typed-actor/TypedActorLifecycleSpec.scala index 10fc40493b..1fcbe0c5ef 100644 --- a/akka-typed-actor/src/test/scala/actor/typed-actor/TypedActorLifecycleSpec.scala +++ b/akka-typed-actor/src/test/scala/actor/typed-actor/TypedActorLifecycleSpec.scala @@ -45,6 +45,7 @@ class TypedActorLifecycleSpec extends Spec with ShouldMatchers with BeforeAndAft fail("expected exception not thrown") } catch { case e: RuntimeException => { + println("#failed") cdl.await assert(SamplePojoImpl._pre) assert(SamplePojoImpl._post) From ec61c29f21672f17cd30292192cbbee0d2ea8aae Mon Sep 17 00:00:00 2001 From: Michael Kober Date: Mon, 6 Sep 2010 16:33:55 +0200 Subject: [PATCH 2/8] implemented server managed typed actor --- .../src/main/scala/actor/ActorRef.scala | 2 +- .../src/main/scala/remote/RemoteClient.scala | 28 ++-- .../src/main/scala/remote/RemoteServer.scala | 60 ++++--- .../akka/actor/RemoteTypedActorOneImpl.java | 3 - .../ServerInitiatedRemoteActorSpec.scala | 1 + .../ServerInitiatedRemoteTypedActorSpec.scala | 116 ++++++++------ .../src/main/scala/actor/TypedActor.scala | 146 +++++------------- .../typed-actor/TypedActorLifecycleSpec.scala | 1 - 8 files changed, 155 insertions(+), 202 deletions(-) diff --git a/akka-actor/src/main/scala/actor/ActorRef.scala b/akka-actor/src/main/scala/actor/ActorRef.scala index c712a8d408..5da69ea7c2 100644 --- a/akka-actor/src/main/scala/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/actor/ActorRef.scala @@ -1363,7 +1363,7 @@ private[akka] case class RemoteActorRef private[akka] ( val port: Int, _timeout: Long, loader: Option[ClassLoader], - val actorType: ActorType = ActorType.ScalaActor ) + val actorType: ActorType = ActorType.ScalaActor) extends ActorRef with ScalaActorRef { ensureRemotingEnabled diff --git a/akka-remote/src/main/scala/remote/RemoteClient.scala b/akka-remote/src/main/scala/remote/RemoteClient.scala index 689f55a049..dc3cf6b146 100644 --- a/akka-remote/src/main/scala/remote/RemoteClient.scala +++ b/akka-remote/src/main/scala/remote/RemoteClient.scala @@ -80,16 +80,6 @@ object RemoteClient extends Logging { def actorFor(classNameOrServiceId: String, hostname: String, port: Int): ActorRef = actorFor(classNameOrServiceId, classNameOrServiceId, 5000L, hostname, port, None) - // FIXME: - def typedActorFor[T](intfClass: Class[T], serviceId: String, implClassName: String, timeout: Long, hostname: String, port: Int) : T = { - - println("### create RemoteActorRef") - val actorRef = RemoteActorRef(serviceId, implClassName, hostname, port, timeout, None, ActorType.TypedActor) - val proxy = TypedActor.createProxyForRemoteActorRef(intfClass, actorRef) - proxy - - } - def actorFor(classNameOrServiceId: String, hostname: String, port: Int, loader: ClassLoader): ActorRef = actorFor(classNameOrServiceId, classNameOrServiceId, 5000L, hostname, port, Some(loader)) @@ -108,8 +98,26 @@ object RemoteClient extends Logging { def actorFor(serviceId: String, className: String, timeout: Long, hostname: String, port: Int): ActorRef = RemoteActorRef(serviceId, className, hostname, port, timeout, None) + def typedActorFor[T](intfClass: Class[T], serviceIdOrClassName: String, hostname: String, port: Int) : T = { + typedActorFor(intfClass, serviceIdOrClassName, serviceIdOrClassName, 5000L, hostname, port, None) + } + def typedActorFor[T](intfClass: Class[T], serviceIdOrClassName: String, timeout: Long, hostname: String, port: Int) : T = { + typedActorFor(intfClass, serviceIdOrClassName, serviceIdOrClassName, timeout, hostname, port, None) + } + def typedActorFor[T](intfClass: Class[T], serviceIdOrClassName: String, timeout: Long, hostname: String, port: Int, loader: ClassLoader) : T = { + typedActorFor(intfClass, serviceIdOrClassName, serviceIdOrClassName, timeout, hostname, port, Some(loader)) + } + + def typedActorFor[T](intfClass: Class[T], serviceId: String, implClassName: String, timeout: Long, hostname: String, port: Int, loader: ClassLoader) : T = { + typedActorFor(intfClass, serviceId, implClassName, timeout, hostname, port, Some(loader)) + } + + private[akka] def typedActorFor[T](intfClass: Class[T], serviceId: String, implClassName: String, timeout: Long, hostname: String, port: Int, loader: Option[ClassLoader]) : T = { + val actorRef = RemoteActorRef(serviceId, implClassName, hostname, port, timeout, loader, ActorType.TypedActor) + TypedActor.createProxyForRemoteActorRef(intfClass, actorRef) + } private[akka] def actorFor(serviceId: String, className: String, timeout: Long, hostname: String, port: Int, loader: ClassLoader): ActorRef = RemoteActorRef(serviceId, className, hostname, port, timeout, Some(loader)) diff --git a/akka-remote/src/main/scala/remote/RemoteServer.scala b/akka-remote/src/main/scala/remote/RemoteServer.scala index af1507c3cb..5a7fbd00d9 100644 --- a/akka-remote/src/main/scala/remote/RemoteServer.scala +++ b/akka-remote/src/main/scala/remote/RemoteServer.scala @@ -10,7 +10,7 @@ import java.util.concurrent.{ConcurrentHashMap, Executors} import java.util.{Map => JMap} import se.scalablesolutions.akka.actor.{ - Actor, TypedActor, ActorRef, LocalActorRef, RemoteActorRef, IllegalActorStateException, RemoteActorSystemMessage} + Actor, TypedActor, ActorRef, IllegalActorStateException, RemoteActorSystemMessage} import se.scalablesolutions.akka.actor.Actor._ import se.scalablesolutions.akka.util._ import se.scalablesolutions.akka.remote.protocol.RemoteProtocol._ @@ -120,12 +120,9 @@ object RemoteServer { } } - // FIXME private - class RemoteActorSet { - //private[RemoteServer] val actors = new ConcurrentHashMap[String, ActorRef] - val actors = new ConcurrentHashMap[String, ActorRef] - //private[RemoteServer] val typedActors = new ConcurrentHashMap[String, AnyRef] - val typedActors = new ConcurrentHashMap[String, AnyRef] + private class RemoteActorSet { + private[RemoteServer] val actors = new ConcurrentHashMap[String, ActorRef] + private[RemoteServer] val typedActors = new ConcurrentHashMap[String, AnyRef] } private val guard = new ReadWriteGuard @@ -133,13 +130,10 @@ object RemoteServer { private val remoteServers = Map[Address, RemoteServer]() private[akka] def registerActor(address: InetSocketAddress, uuid: String, actor: ActorRef) = guard.withWriteGuard { - // FIXME - //actorsFor(RemoteServer.Address(address.getHostName, address.getPort)).actors.put(uuid, actor) - val actors = actorsFor(RemoteServer.Address(address.getHostName, address.getPort)).actors - actors.put(uuid, actor) + actorsFor(RemoteServer.Address(address.getHostName, address.getPort)).actors.put(uuid, actor) } - private[akka] def registerTypedActor(address: InetSocketAddress, uuid: String, typedActor: TypedActor) = guard.withWriteGuard { + private[akka] def registerTypedActor(address: InetSocketAddress, uuid: String, typedActor: AnyRef) = guard.withWriteGuard { actorsFor(RemoteServer.Address(address.getHostName, address.getPort)).typedActors.put(uuid, typedActor) } @@ -165,9 +159,7 @@ object RemoteServer { remoteServers.remove(Address(hostname, port)) } - // FIXME - def actorsFor(remoteServerAddress: RemoteServer.Address): RemoteActorSet = { - println("##### actorsFor SIZE=" + remoteActorSets.size) + private def actorsFor(remoteServerAddress: RemoteServer.Address): RemoteActorSet = { remoteActorSets.getOrElseUpdate(remoteServerAddress,new RemoteActorSet) } } @@ -278,23 +270,19 @@ class RemoteServer extends Logging with ListenerManagement { } } - // FIXME: register typed actor in RemoteServer as well - def registerTypedActor(id: String, actorRef: AnyRef): Unit = synchronized { + /** + * Register remote typed actor by a specific id. + * @param id custom actor id + * @param typedActor typed actor to register + */ + def registerTypedActor(id: String, typedActor: AnyRef): Unit = synchronized { val typedActors = RemoteServer.actorsFor(RemoteServer.Address(hostname, port)).typedActors if (!typedActors.contains(id)) { - log.debug("Registering server side remote actor [%s] with id [%s] on [%s:%d]", actorRef.getClass.getName, id, hostname, port) - typedActors.put(id, actorRef) + log.debug("Registering server side remote actor [%s] with id [%s] on [%s:%d]", typedActor.getClass.getName, id, hostname, port) + typedActors.put(id, typedActor) } } - private def actors() : ConcurrentHashMap[String, ActorRef] = { - RemoteServer.actorsFor(RemoteServer.Address(hostname, port)).actors - } - - private def typedActors() : ConcurrentHashMap[String, AnyRef] = { - RemoteServer.actorsFor(RemoteServer.Address(hostname, port)).typedActors - } - /** * Register Remote Actor by the Actor's 'id' field. It starts the Actor if it is not started already. */ @@ -308,7 +296,6 @@ class RemoteServer extends Logging with ListenerManagement { def register(id: String, actorRef: ActorRef): Unit = synchronized { if (_isRunning) { val actors = RemoteServer.actorsFor(RemoteServer.Address(hostname, port)).actors - println("___ register ___ " + actors.hashCode + " hostname=" + hostname + " port="+ port) if (!actors.contains(id)) { if (!actorRef.isRunning) actorRef.start log.debug("Registering server side remote actor [%s] with id [%s]", actorRef.actorClass.getName, id) @@ -344,16 +331,27 @@ class RemoteServer extends Logging with ListenerManagement { } } - //FIXME: unregister typed Actor + /** + * Unregister Remote Typed Actor by specific 'id'. + *

+ * NOTE: You need to call this method if you have registered an actor by a custom ID. + */ + def unregisterTypedActor(id: String):Unit = synchronized { + if (_isRunning) { + log.info("Unregistering server side remote typed actor with id [%s]", id) + val registeredTypedActors = typedActors() + registeredTypedActors.remove(id) + } + } protected override def manageLifeCycleOfListeners = false protected[akka] override def foreachListener(f: (ActorRef) => Unit): Unit = super.foreachListener(f) - private def actors() : ConcurrentHashMap[String, ActorRef] = { + private[akka] def actors() : ConcurrentHashMap[String, ActorRef] = { RemoteServer.actorsFor(RemoteServer.Address(hostname, port)).actors } - private def typedActors() : ConcurrentHashMap[String, AnyRef] = { + private[akka] def typedActors() : ConcurrentHashMap[String, AnyRef] = { RemoteServer.actorsFor(RemoteServer.Address(hostname, port)).typedActors } } diff --git a/akka-remote/src/test/java/se/scalablesolutions/akka/actor/RemoteTypedActorOneImpl.java b/akka-remote/src/test/java/se/scalablesolutions/akka/actor/RemoteTypedActorOneImpl.java index a30aa26124..715e5366a4 100644 --- a/akka-remote/src/test/java/se/scalablesolutions/akka/actor/RemoteTypedActorOneImpl.java +++ b/akka-remote/src/test/java/se/scalablesolutions/akka/actor/RemoteTypedActorOneImpl.java @@ -18,12 +18,9 @@ public class RemoteTypedActorOneImpl extends TypedActor implements RemoteTypedAc } public void oneWay() throws Exception { - System.out.println("--------> got it!!!!!!"); RemoteTypedActorLog.oneWayLog().put("oneway"); } - - @Override public void preRestart(Throwable e) { try { RemoteTypedActorLog.messageLog().put(e.getMessage()); } catch(Exception ex) {} diff --git a/akka-remote/src/test/scala/remote/ServerInitiatedRemoteActorSpec.scala b/akka-remote/src/test/scala/remote/ServerInitiatedRemoteActorSpec.scala index 8b1e0ef765..012d42f92a 100644 --- a/akka-remote/src/test/scala/remote/ServerInitiatedRemoteActorSpec.scala +++ b/akka-remote/src/test/scala/remote/ServerInitiatedRemoteActorSpec.scala @@ -144,5 +144,6 @@ class ServerInitiatedRemoteActorSpec extends JUnitSuite { assert(numberOfActorsInRegistry === ActorRegistry.actors.length) actor.stop } + } diff --git a/akka-remote/src/test/scala/remote/ServerInitiatedRemoteTypedActorSpec.scala b/akka-remote/src/test/scala/remote/ServerInitiatedRemoteTypedActorSpec.scala index 76c50f7658..b800fbf2c3 100644 --- a/akka-remote/src/test/scala/remote/ServerInitiatedRemoteTypedActorSpec.scala +++ b/akka-remote/src/test/scala/remote/ServerInitiatedRemoteTypedActorSpec.scala @@ -4,42 +4,46 @@ package se.scalablesolutions.akka.actor.remote -import java.util.concurrent.{CountDownLatch, TimeUnit} -import org.scalatest.junit.JUnitSuite -import org.junit.{Test, Before, After} +import org.scalatest.Spec +import org.scalatest.matchers.ShouldMatchers +import org.scalatest.BeforeAndAfterAll +import org.scalatest.junit.JUnitRunner +import org.junit.runner.RunWith + +import java.util.concurrent.TimeUnit import se.scalablesolutions.akka.remote.{RemoteServer, RemoteClient} import se.scalablesolutions.akka.actor._ +import RemoteTypedActorLog._ object ServerInitiatedRemoteTypedActorSpec { val HOSTNAME = "localhost" val PORT = 9990 var server: RemoteServer = null - - class SimpleActor extends Actor { - def receive = { - case _ => println("received message") - } - } - - } -class ServerInitiatedRemoteTypedActorSpec extends JUnitSuite { +@RunWith(classOf[JUnitRunner]) +class ServerInitiatedRemoteTypedActorSpec extends + Spec with + ShouldMatchers with + BeforeAndAfterAll { import ServerInitiatedRemoteTypedActorSpec._ + private val unit = TimeUnit.MILLISECONDS - @Before - def init { + override def beforeAll = { server = new RemoteServer() server.start(HOSTNAME, PORT) + + val typedActor = TypedActor.newInstance(classOf[RemoteTypedActorOne], classOf[RemoteTypedActorOneImpl], 1000) + server.registerTypedActor("typed-actor-service", typedActor) + Thread.sleep(1000) } // make sure the servers shutdown cleanly after the test has finished - @After - def finished { + override def afterAll = { try { server.shutdown RemoteClient.shutdownAll @@ -49,38 +53,60 @@ class ServerInitiatedRemoteTypedActorSpec extends JUnitSuite { } } + describe("Server managed remote typed Actor ") { - @Test - def shouldSendWithBang { + it("should receive one-way message") { + clearMessageLogs + val actor = RemoteClient.typedActorFor(classOf[RemoteTypedActorOne], "typed-actor-service", 5000L, HOSTNAME, PORT) + expect("oneway") { + actor.oneWay + oneWayLog.poll(5, TimeUnit.SECONDS) + } + } - /* - val clientManangedTypedActor = TypedActor.newRemoteInstance(classOf[RemoteTypedActorOne], classOf[RemoteTypedActorOneImpl], 1000, HOSTNAME, PORT) - clientManangedTypedActor.requestReply("test-string") - Thread.sleep(2000) - println("###########") - */ - /* - trace() - val actor = Actor.actorOf[SimpleActor].start - server.register("simple-actor", actor) - val typedActor = TypedActor.newInstance(classOf[RemoteTypedActorOne], classOf[RemoteTypedActorOneImpl], 1000) - server.registerTypedActor("typed-actor-service", typedActor) - println("### registered actor") - trace() + it("should respond to request-reply message") { + clearMessageLogs + val actor = RemoteClient.typedActorFor(classOf[RemoteTypedActorOne], "typed-actor-service", 5000L, HOSTNAME, PORT) + expect("pong") { + actor.requestReply("ping") + } + } - //val actorRef = RemoteActorRef("typed-actor-service", classOf[RemoteTypedActorOneImpl].getName, HOSTNAME, PORT, 5000L, None) - //val myActor = TypedActor.createProxyForRemoteActorRef(classOf[RemoteTypedActorOne], actorRef) - val myActor = RemoteClient.typedActorFor(classOf[RemoteTypedActorOne], "typed-actor-service", classOf[RemoteTypedActorOneImpl].getName, 5000L, HOSTNAME, PORT) - println("### call one way") - myActor.oneWay() - Thread.sleep(3000) - println("### call one way - done") - */ - //assert(RemoteActorSpecActorUnidirectional.latch.await(1, TimeUnit.SECONDS)) - //actor.stop - /* */ + it("should not recreate registered actors") { + val actor = RemoteClient.typedActorFor(classOf[RemoteTypedActorOne], "typed-actor-service", 5000L, HOSTNAME, PORT) + val numberOfActorsInRegistry = ActorRegistry.actors.length + expect("oneway") { + actor.oneWay + oneWayLog.poll(5, TimeUnit.SECONDS) + } + assert(numberOfActorsInRegistry === ActorRegistry.actors.length) + } + + it("should support multiple variants to get the actor from client side") { + var actor = RemoteClient.typedActorFor(classOf[RemoteTypedActorOne], "typed-actor-service", 5000L, HOSTNAME, PORT) + expect("oneway") { + actor.oneWay + oneWayLog.poll(5, TimeUnit.SECONDS) + } + actor = RemoteClient.typedActorFor(classOf[RemoteTypedActorOne], "typed-actor-service", HOSTNAME, PORT) + expect("oneway") { + actor.oneWay + oneWayLog.poll(5, TimeUnit.SECONDS) + } + actor = RemoteClient.typedActorFor(classOf[RemoteTypedActorOne], "typed-actor-service", 5000L, HOSTNAME, PORT, this.getClass().getClassLoader) + expect("oneway") { + actor.oneWay + oneWayLog.poll(5, TimeUnit.SECONDS) + } + } + + it("should register and unregister typed actors") { + val typedActor = TypedActor.newInstance(classOf[RemoteTypedActorOne], classOf[RemoteTypedActorOneImpl], 1000) + server.registerTypedActor("my-test-service", typedActor) + assert(server.typedActors().get("my-test-service") != null) + server.unregisterTypedActor("my-test-service") + assert(server.typedActors().get("my-test-service") == null) + } } - - } diff --git a/akka-typed-actor/src/main/scala/actor/TypedActor.scala b/akka-typed-actor/src/main/scala/actor/TypedActor.scala index 7f0ee5dbbc..385c1831a4 100644 --- a/akka-typed-actor/src/main/scala/actor/TypedActor.scala +++ b/akka-typed-actor/src/main/scala/actor/TypedActor.scala @@ -182,7 +182,6 @@ abstract class TypedActor extends Actor with Proxyable { case Link(proxy) => self.link(proxy) case Unlink(proxy) => self.unlink(proxy) - case method: String => println("### got method") case unexpected => throw new IllegalActorStateException( "Unexpected message [" + unexpected + "] sent to [" + this + "]") } @@ -408,8 +407,11 @@ object TypedActor extends Logging { proxy.asInstanceOf[T] } - // FIXME - def createProxyForRemoteActorRef[T](intfClass: Class[T], actorRef: ActorRef): T = { + /** + * Create a proxy for a RemoteActorRef representing a server managed remote typed actor. + * + */ + private[akka] def createProxyForRemoteActorRef[T](intfClass: Class[T], actorRef: ActorRef): T = { class MyInvocationHandler extends InvocationHandler { def invoke(proxy: AnyRef, method: Method, args: Array[AnyRef]): AnyRef = { @@ -423,9 +425,6 @@ object TypedActor extends Logging { val jProxy = JProxy.newProxyInstance(intfClass.getClassLoader(), interfaces, handler) val awProxy = Proxy.newInstance(interfaces, Array(jProxy, jProxy), true, false) - // TODO: needed? - //typedActor.initialize(proxy) - // TODO: timeout?? AspectInitRegistry.register(awProxy, AspectInit(intfClass, null, actorRef, None, 5000L)) awProxy.asInstanceOf[T] } @@ -569,6 +568,7 @@ object TypedActor extends Logging { private[akka] def isJoinPoint(message: Any): Boolean = message.isInstanceOf[JoinPoint] } + /** * AspectWerkz Aspect that is turning POJO into proxy to a server managed remote TypedActor. *

@@ -578,103 +578,18 @@ object TypedActor extends Logging { * @author Jonas Bonér */ @Aspect("perInstance") -private[akka] sealed class ServerManagedTypedActorAspect { - @volatile private var isInitialized = false - @volatile private var isStopped = false - private var interfaceClass: Class[_] = _ - private var actorRef: ActorRef = _ - private var timeout: Long = _ - private var uuid: String = _ - private var remoteAddress: Option[InetSocketAddress] = _ - - //FIXME - +private[akka] sealed class ServerManagedTypedActorAspect extends ActorAspect { + @Around("execution(* *.*(..)) && this(se.scalablesolutions.akka.actor.ServerManagedTypedActor)") def invoke(joinPoint: JoinPoint): AnyRef = { - println("### MyAspect intercepted " + joinPoint.getSignature) if (!isInitialized) initialize(joinPoint) remoteDispatch(joinPoint) } - - private def remoteDispatch(joinPoint: JoinPoint): AnyRef = { - val methodRtti = joinPoint.getRtti.asInstanceOf[MethodRtti] - val isOneWay = TypedActor.isOneWay(methodRtti) - - val (message: Array[AnyRef], isEscaped) = escapeArguments(methodRtti.getParameterValues) - - println("### remote dispatch...") - - val future = RemoteClientModule.send[AnyRef]( - message, None, None, remoteAddress.get, - timeout, isOneWay, actorRef, - Some((interfaceClass.getName, methodRtti.getMethod.getName)), - ActorType.TypedActor) - - if (isOneWay) null // for void methods - else { - if (future.isDefined) { - future.get.await - val result = getResultOrThrowException(future.get) - if (result.isDefined) result.get - else throw new IllegalActorStateException("No result returned from call to [" + joinPoint + "]") - } else throw new IllegalActorStateException("No future returned from call to [" + joinPoint + "]") - } - } - - /* - private def remoteDispatch(joinPoint: JoinPoint): AnyRef = { - val methodRtti = joinPoint.getRtti.asInstanceOf[MethodRtti] - val isOneWay = TypedActor.isOneWay(methodRtti) - val senderActorRef = Some(SenderContextInfo.senderActorRef.value) - - - if (!actorRef.isRunning && !isStopped) { - isStopped = true - // FIXME - what to do? - // joinPoint.proceed - null - } else if (isOneWay) { - actorRef.!("joinPoint") - //actorRef.!(joinPoint)(senderActorRef) - null.asInstanceOf[AnyRef] - } else if (TypedActor.returnsFuture_?(methodRtti)) { - actorRef.!!!(joinPoint, timeout)(senderActorRef) - } else { - val result = (actorRef.!!(joinPoint, timeout)(senderActorRef)).as[AnyRef] - if (result.isDefined) result.get - else throw new ActorTimeoutException("Invocation to [" + joinPoint + "] timed out.") - } - } - */ - private def getResultOrThrowException[T](future: Future[T]): Option[T] = - if (future.exception.isDefined) throw future.exception.get - else future.result - - private def escapeArguments(args: Array[AnyRef]): Tuple2[Array[AnyRef], Boolean] = { - var isEscaped = false - val escapedArgs = for (arg <- args) yield { - val clazz = arg.getClass - if (clazz.getName.contains(TypedActor.AW_PROXY_PREFIX)) { - isEscaped = true - TypedActor.AW_PROXY_PREFIX + clazz.getSuperclass.getName - } else arg - } - (escapedArgs, isEscaped) - } - - - private def initialize(joinPoint: JoinPoint): Unit = { - val init = AspectInitRegistry.initFor(joinPoint.getThis) - interfaceClass = init.interfaceClass - actorRef = init.actorRef - uuid = actorRef.uuid + override def initialize(joinPoint: JoinPoint): Unit = { + super.initialize(joinPoint) remoteAddress = actorRef.remoteAddress - println("### address= " + remoteAddress.get) - timeout = init.timeout - isInitialized = true } - } /** @@ -686,16 +601,8 @@ private[akka] sealed class ServerManagedTypedActorAspect { * @author Jonas Bonér */ @Aspect("perInstance") -private[akka] sealed class TypedActorAspect { - @volatile private var isInitialized = false - @volatile private var isStopped = false - private var interfaceClass: Class[_] = _ - private var typedActor: TypedActor = _ - private var actorRef: ActorRef = _ - private var remoteAddress: Option[InetSocketAddress] = _ - private var timeout: Long = _ - private var uuid: String = _ - +private[akka] sealed class TypedActorAspect extends ActorAspect { + @Around("execution(* *.*(..)) && !this(se.scalablesolutions.akka.actor.ServerManagedTypedActor)") def invoke(joinPoint: JoinPoint): AnyRef = { if (!isInitialized) initialize(joinPoint) @@ -706,12 +613,26 @@ private[akka] sealed class TypedActorAspect { if (remoteAddress.isDefined) remoteDispatch(joinPoint) else localDispatch(joinPoint) } +} - private def localDispatch(joinPoint: JoinPoint): AnyRef = { - val methodRtti = joinPoint.getRtti.asInstanceOf[MethodRtti] - val isOneWay = TypedActor.isOneWay(methodRtti) +/** + * Base class for TypedActorAspect and ServerManagedTypedActorAspect to reduce code duplication. + */ +private[akka] abstract class ActorAspect { + @volatile protected var isInitialized = false + @volatile protected var isStopped = false + protected var interfaceClass: Class[_] = _ + protected var typedActor: TypedActor = _ + protected var actorRef: ActorRef = _ + protected var timeout: Long = _ + protected var uuid: String = _ + protected var remoteAddress: Option[InetSocketAddress] = _ + + protected def localDispatch(joinPoint: JoinPoint): AnyRef = { + val methodRtti = joinPoint.getRtti.asInstanceOf[MethodRtti] + val isOneWay = TypedActor.isOneWay(methodRtti) val senderActorRef = Some(SenderContextInfo.senderActorRef.value) - val senderProxy = Some(SenderContextInfo.senderProxy.value) + val senderProxy = Some(SenderContextInfo.senderProxy.value) typedActor.context._sender = senderProxy if (!actorRef.isRunning && !isStopped) { @@ -732,7 +653,7 @@ private[akka] sealed class TypedActorAspect { } } - private def remoteDispatch(joinPoint: JoinPoint): AnyRef = { + protected def remoteDispatch(joinPoint: JoinPoint): AnyRef = { val methodRtti = joinPoint.getRtti.asInstanceOf[MethodRtti] val isOneWay = TypedActor.isOneWay(methodRtti) @@ -771,7 +692,7 @@ private[akka] sealed class TypedActorAspect { (escapedArgs, isEscaped) } - private def initialize(joinPoint: JoinPoint): Unit = { + protected def initialize(joinPoint: JoinPoint): Unit = { val init = AspectInitRegistry.initFor(joinPoint.getThis) interfaceClass = init.interfaceClass typedActor = init.targetInstance @@ -839,4 +760,7 @@ private[akka] sealed case class AspectInit( } +/** + * Marker interface for server manager typed actors. + */ private[akka] sealed trait ServerManagedTypedActor extends TypedActor \ No newline at end of file diff --git a/akka-typed-actor/src/test/scala/actor/typed-actor/TypedActorLifecycleSpec.scala b/akka-typed-actor/src/test/scala/actor/typed-actor/TypedActorLifecycleSpec.scala index 1fcbe0c5ef..10fc40493b 100644 --- a/akka-typed-actor/src/test/scala/actor/typed-actor/TypedActorLifecycleSpec.scala +++ b/akka-typed-actor/src/test/scala/actor/typed-actor/TypedActorLifecycleSpec.scala @@ -45,7 +45,6 @@ class TypedActorLifecycleSpec extends Spec with ShouldMatchers with BeforeAndAft fail("expected exception not thrown") } catch { case e: RuntimeException => { - println("#failed") cdl.await assert(SamplePojoImpl._pre) assert(SamplePojoImpl._post) From 869549c590ce91686ad19caf5ea2336bd51388b2 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Tue, 7 Sep 2010 11:02:12 +0200 Subject: [PATCH 3/8] Fixing id/uuid misfortune --- .../src/main/scala/remote/RemoteServer.scala | 16 ++++++---------- .../remote/ClientInitiatedRemoteActorSpec.scala | 2 +- 2 files changed, 7 insertions(+), 11 deletions(-) diff --git a/akka-remote/src/main/scala/remote/RemoteServer.scala b/akka-remote/src/main/scala/remote/RemoteServer.scala index 142670a84a..6337bd9893 100644 --- a/akka-remote/src/main/scala/remote/RemoteServer.scala +++ b/akka-remote/src/main/scala/remote/RemoteServer.scala @@ -274,9 +274,9 @@ class RemoteServer extends Logging with ListenerManagement { // TODO: register typed actor in RemoteServer as well /** - * Register Remote Actor by the Actor's 'uuid' field. It starts the Actor if it is not started already. + * Register Remote Actor by the Actor's 'id' field. It starts the Actor if it is not started already. */ - def register(actorRef: ActorRef): Unit = register(actorRef.uuid,actorRef) + def register(actorRef: ActorRef): Unit = register(actorRef.id,actorRef) /** * Register Remote Actor by a specific 'id' passed as argument. @@ -295,13 +295,13 @@ class RemoteServer extends Logging with ListenerManagement { } /** - * Unregister Remote Actor that is registered using its 'uuid' field (not custom ID). + * Unregister Remote Actor that is registered using its 'id' field (not custom ID). */ def unregister(actorRef: ActorRef):Unit = synchronized { if (_isRunning) { log.debug("Unregistering server side remote actor [%s] with id [%s:%s]", actorRef.actorClass.getName, actorRef.id, actorRef.uuid) val actorMap = actors() - actorMap remove actorRef.uuid + actorMap remove actorRef.id if (actorRef.registeredInRemoteNodeDuringSerialization) actorMap remove actorRef.uuid } } @@ -325,12 +325,8 @@ class RemoteServer extends Logging with ListenerManagement { protected[akka] override def foreachListener(f: (ActorRef) => Unit): Unit = super.foreachListener(f) - private[akka] def actors() : ConcurrentHashMap[String, ActorRef] = { - RemoteServer.actorsFor(address).actors - } - private[akka] def typedActors() : ConcurrentHashMap[String, AnyRef] = { - RemoteServer.actorsFor(address).typedActors - } + private[akka] def actors() = RemoteServer.actorsFor(address).actors + private[akka] def typedActors() = RemoteServer.actorsFor(address).typedActors } object RemoteServerSslContext { diff --git a/akka-remote/src/test/scala/remote/ClientInitiatedRemoteActorSpec.scala b/akka-remote/src/test/scala/remote/ClientInitiatedRemoteActorSpec.scala index 7ff46ab910..e03259e573 100644 --- a/akka-remote/src/test/scala/remote/ClientInitiatedRemoteActorSpec.scala +++ b/akka-remote/src/test/scala/remote/ClientInitiatedRemoteActorSpec.scala @@ -103,7 +103,7 @@ class ClientInitiatedRemoteActorSpec extends JUnitSuite { sender.actor.asInstanceOf[SendOneWayAndReplySenderActor].sendTo = actor sender.start sender.actor.asInstanceOf[SendOneWayAndReplySenderActor].sendOff - assert(SendOneWayAndReplySenderActor.latch.await(1, TimeUnit.SECONDS)) + assert(SendOneWayAndReplySenderActor.latch.await(3, TimeUnit.SECONDS)) assert(sender.actor.asInstanceOf[SendOneWayAndReplySenderActor].state.isDefined === true) assert("World" === sender.actor.asInstanceOf[SendOneWayAndReplySenderActor].state.get.asInstanceOf[String]) actor.stop From 40a66050648079acdf14d553f8e74e6ceda88e3f Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Tue, 7 Sep 2010 11:02:40 +0200 Subject: [PATCH 4/8] Removing boilerplate in ReflectiveAccess --- .../main/scala/util/ReflectiveAccess.scala | 66 ++++++++----------- 1 file changed, 26 insertions(+), 40 deletions(-) diff --git a/akka-actor/src/main/scala/util/ReflectiveAccess.scala b/akka-actor/src/main/scala/util/ReflectiveAccess.scala index 4582304188..878403d026 100644 --- a/akka-actor/src/main/scala/util/ReflectiveAccess.scala +++ b/akka-actor/src/main/scala/util/ReflectiveAccess.scala @@ -29,6 +29,9 @@ object ReflectiveAccess { def ensureTypedActorEnabled = TypedActorModule.ensureTypedActorEnabled def ensureJtaEnabled = JtaModule.ensureJtaEnabled + private val noParams = Array[Class[_]]() + private val noArgs = Array[AnyRef]() + /** * Reflective access to the RemoteClient module. * @@ -62,14 +65,8 @@ object ReflectiveAccess { def ensureRemotingEnabled = if (!isRemotingEnabled) throw new ModuleNotAvailableException( "Can't load the remoting module, make sure that akka-remote.jar is on the classpath") - val remoteClientObjectInstance: Option[RemoteClientObject] = { - try { - val clazz = loader.loadClass("se.scalablesolutions.akka.remote.RemoteClient$") - val ctor = clazz.getDeclaredConstructor(Array[Class[_]](): _*) - ctor.setAccessible(true) - Some(ctor.newInstance(Array[AnyRef](): _*).asInstanceOf[RemoteClientObject]) - } catch { case e: Exception => None } - } + val remoteClientObjectInstance: Option[RemoteClientObject] = + createInstance("se.scalablesolutions.akka.remote.RemoteClient$") def register(address: InetSocketAddress, uuid: String) = { ensureRemotingEnabled @@ -126,23 +123,11 @@ object ReflectiveAccess { def unregister(actorRef: ActorRef): Unit } - val remoteServerObjectInstance: Option[RemoteServerObject] = { - try { - val clazz = loader.loadClass("se.scalablesolutions.akka.remote.RemoteServer$") - val ctor = clazz.getDeclaredConstructor(Array[Class[_]](): _*) - ctor.setAccessible(true) - Some(ctor.newInstance(Array[AnyRef](): _*).asInstanceOf[RemoteServerObject]) - } catch { case e: Exception => None } - } + val remoteServerObjectInstance: Option[RemoteServerObject] = + createInstance("se.scalablesolutions.akka.remote.RemoteServer$") - val remoteNodeObjectInstance: Option[RemoteNodeObject] = { - try { - val clazz = loader.loadClass("se.scalablesolutions.akka.remote.RemoteNode$") - val ctor = clazz.getDeclaredConstructor(Array[Class[_]](): _*) - ctor.setAccessible(true) - Some(ctor.newInstance(Array[AnyRef](): _*).asInstanceOf[RemoteNodeObject]) - } catch { case e: Exception => None } - } + val remoteNodeObjectInstance: Option[RemoteNodeObject] = + createInstance("se.scalablesolutions.akka.remote.RemoteNode$") def registerActor(address: InetSocketAddress, uuid: String, actorRef: ActorRef) = { ensureRemotingEnabled @@ -177,14 +162,8 @@ object ReflectiveAccess { def ensureTypedActorEnabled = if (!isTypedActorEnabled) throw new ModuleNotAvailableException( "Can't load the typed actor module, make sure that akka-typed-actor.jar is on the classpath") - val typedActorObjectInstance: Option[TypedActorObject] = { - try { - val clazz = loader.loadClass("se.scalablesolutions.akka.actor.TypedActor$") - val ctor = clazz.getDeclaredConstructor(Array[Class[_]](): _*) - ctor.setAccessible(true) - Some(ctor.newInstance(Array[AnyRef](): _*).asInstanceOf[TypedActorObject]) - } catch { case e: Exception => None } - } + val typedActorObjectInstance: Option[TypedActorObject] = + createInstance("se.scalablesolutions.akka.actor.TypedActor$") def resolveFutureIfMessageIsJoinPoint(message: Any, future: Future[_]): Boolean = { ensureTypedActorEnabled @@ -212,18 +191,25 @@ object ReflectiveAccess { def ensureJtaEnabled = if (!isJtaEnabled) throw new ModuleNotAvailableException( "Can't load the typed actor module, make sure that akka-jta.jar is on the classpath") - val transactionContainerObjectInstance: Option[TransactionContainerObject] = { - try { - val clazz = loader.loadClass("se.scalablesolutions.akka.actor.TransactionContainer$") - val ctor = clazz.getDeclaredConstructor(Array[Class[_]](): _*) - ctor.setAccessible(true) - Some(ctor.newInstance(Array[AnyRef](): _*).asInstanceOf[TransactionContainerObject]) - } catch { case e: Exception => None } - } + val transactionContainerObjectInstance: Option[TransactionContainerObject] = + createInstance("se.scalablesolutions.akka.actor.TransactionContainer$") def createTransactionContainer: TransactionContainer = { ensureJtaEnabled transactionContainerObjectInstance.get.apply.asInstanceOf[TransactionContainer] } } + + protected def createInstance[T](fqn: String, + ctorSpec: Array[Class[_]] = noParams, + ctorArgs: Array[AnyRef] = noArgs): Option[T] = try { + val clazz = loader.loadClass(fqn) + val ctor = clazz.getDeclaredConstructor(ctorSpec: _*) + ctor.setAccessible(true) + Some(ctor.newInstance(ctorArgs: _*).asInstanceOf[T]) + } catch { + case e: Exception => + Logger("createInstance").error(e, "Couldn't load [%s(%s) => %s(%s)]",fqn,ctorSpec.mkString(", "),fqn,ctorArgs.mkString(", ")) + None + } } From bfb612908b092ee81c34bc3bfc6612744d2ceb22 Mon Sep 17 00:00:00 2001 From: Michael Kober Date: Thu, 9 Sep 2010 10:42:03 +0200 Subject: [PATCH 5/8] closing ticket 378 --- .../src/main/scala/remote/RemoteServer.scala | 5 + .../akka/spring/akka-1.0-SNAPSHOT.xsd | 62 ++++++- .../scala/ActorBeanDefinitionParser.scala | 72 +++++++ .../src/main/scala/ActorFactoryBean.scala | 73 +++++++- akka-spring/src/main/scala/ActorParser.scala | 175 +++++++++++++++++- .../src/main/scala/ActorProperties.scala | 29 ++- .../src/main/scala/AkkaNamespaceHandler.scala | 11 +- .../scala/AkkaSpringConfigurationTags.scala | 7 + akka-spring/src/main/scala/BeanParser.scala | 42 ----- .../src/main/scala/DispatcherParser.scala | 101 ---------- .../src/main/scala/PropertyEntries.scala | 16 ++ .../src/main/scala/PropertyEntry.scala | 19 -- .../TypedActorBeanDefinitionParser.scala | 31 ---- .../UntypedActorBeanDefinitionParser.scala | 31 ---- .../akka/spring/foo/IMyPojo.java | 10 +- .../akka/spring/foo/MyPojo.java | 52 +++--- .../akka/spring/foo/PingActor.java | 10 +- .../test/resources/server-managed-config.xml | 57 ++++++ .../src/test/resources/typed-actor-config.xml | 2 +- .../test/resources/untyped-actor-config.xml | 2 +- .../TypedActorBeanDefinitionParserTest.scala | 16 +- .../scala/TypedActorSpringFeatureTest.scala | 123 +++++++++--- .../scala/UntypedActorSpringFeatureTest.scala | 140 ++++++++++---- .../src/main/scala/actor/TypedActor.scala | 3 +- .../src/test/resources/META-INF/aop.xml | 1 + 25 files changed, 741 insertions(+), 349 deletions(-) create mode 100644 akka-spring/src/main/scala/ActorBeanDefinitionParser.scala delete mode 100644 akka-spring/src/main/scala/BeanParser.scala delete mode 100644 akka-spring/src/main/scala/DispatcherParser.scala delete mode 100644 akka-spring/src/main/scala/PropertyEntry.scala delete mode 100644 akka-spring/src/main/scala/TypedActorBeanDefinitionParser.scala delete mode 100644 akka-spring/src/main/scala/UntypedActorBeanDefinitionParser.scala create mode 100644 akka-spring/src/test/resources/server-managed-config.xml diff --git a/akka-remote/src/main/scala/remote/RemoteServer.scala b/akka-remote/src/main/scala/remote/RemoteServer.scala index bf9b38ca1b..fa57bda71b 100644 --- a/akka-remote/src/main/scala/remote/RemoteServer.scala +++ b/akka-remote/src/main/scala/remote/RemoteServer.scala @@ -271,6 +271,11 @@ class RemoteServer extends Logging with ListenerManagement { } } + /** + * Register typed actor by interface name. + */ + def registerTypedActor(intfClass: Class[_], typedActor: AnyRef) : Unit = registerTypedActor(intfClass.getName, typedActor) + /** * Register remote typed actor by a specific id. * @param id custom actor id diff --git a/akka-spring/src/main/resources/se/scalablesolutions/akka/spring/akka-1.0-SNAPSHOT.xsd b/akka-spring/src/main/resources/se/scalablesolutions/akka/spring/akka-1.0-SNAPSHOT.xsd index c3d7608bee..80b37c41f5 100644 --- a/akka-spring/src/main/resources/se/scalablesolutions/akka/spring/akka-1.0-SNAPSHOT.xsd +++ b/akka-spring/src/main/resources/se/scalablesolutions/akka/spring/akka-1.0-SNAPSHOT.xsd @@ -66,6 +66,14 @@ + + + + + + + + @@ -107,6 +115,20 @@ + + + + Management type for remote actors: client managed or server managed. + + + + + + + Custom service name for server managed actor. + + + @@ -135,7 +157,7 @@ - Theh default timeout for '!!' invocations. + The default timeout for '!!' invocations. @@ -229,6 +251,41 @@ + + + + + + + + Name of the remote host. + + + + + + + Port of the remote host. + + + + + + + Custom service name or class name for the server managed actor. + + + + + + + Name of the interface the typed actor implements. + + + + + + @@ -294,4 +351,7 @@ + + + diff --git a/akka-spring/src/main/scala/ActorBeanDefinitionParser.scala b/akka-spring/src/main/scala/ActorBeanDefinitionParser.scala new file mode 100644 index 0000000000..55aa82b8e4 --- /dev/null +++ b/akka-spring/src/main/scala/ActorBeanDefinitionParser.scala @@ -0,0 +1,72 @@ +/** + * Copyright (C) 2009-2010 Scalable Solutions AB + */ +package se.scalablesolutions.akka.spring + +import org.springframework.beans.factory.support.BeanDefinitionBuilder +import org.springframework.beans.factory.xml.AbstractSingleBeanDefinitionParser +import org.springframework.beans.factory.xml.ParserContext +import AkkaSpringConfigurationTags._ +import org.w3c.dom.Element + + +/** + * Parser for custom namespace configuration. + * @author michaelkober + */ +class TypedActorBeanDefinitionParser extends AbstractSingleBeanDefinitionParser with ActorParser { + /* + * @see org.springframework.beans.factory.xml.AbstractSingleBeanDefinitionParser#doParse(org.w3c.dom.Element, org.springframework.beans.factory.xml.ParserContext, org.springframework.beans.factory.support.BeanDefinitionBuilder) + */ + override def doParse(element: Element, parserContext: ParserContext, builder: BeanDefinitionBuilder) { + val typedActorConf = parseActor(element) + typedActorConf.typed = TYPED_ACTOR_TAG + typedActorConf.setAsProperties(builder) + } + + /* + * @see org.springframework.beans.factory.xml.AbstractSingleBeanDefinitionParser#getBeanClass(org.w3c.dom.Element) + */ + override def getBeanClass(element: Element): Class[_] = classOf[ActorFactoryBean] +} + + +/** + * Parser for custom namespace configuration. + * @author michaelkober + */ +class UntypedActorBeanDefinitionParser extends AbstractSingleBeanDefinitionParser with ActorParser { + /* + * @see org.springframework.beans.factory.xml.AbstractSingleBeanDefinitionParser#doParse(org.w3c.dom.Element, org.springframework.beans.factory.xml.ParserContext, org.springframework.beans.factory.support.BeanDefinitionBuilder) + */ + override def doParse(element: Element, parserContext: ParserContext, builder: BeanDefinitionBuilder) { + val untypedActorConf = parseActor(element) + untypedActorConf.typed = UNTYPED_ACTOR_TAG + untypedActorConf.setAsProperties(builder) + } + + /* + * @see org.springframework.beans.factory.xml.AbstractSingleBeanDefinitionParser#getBeanClass(org.w3c.dom.Element) + */ + override def getBeanClass(element: Element): Class[_] = classOf[ActorFactoryBean] +} + + +/** + * Parser for custom namespace configuration. + * @author michaelkober + */ +class ActorForBeanDefinitionParser extends AbstractSingleBeanDefinitionParser with ActorForParser { + /* + * @see org.springframework.beans.factory.xml.AbstractSingleBeanDefinitionParser#doParse(org.w3c.dom.Element, org.springframework.beans.factory.xml.ParserContext, org.springframework.beans.factory.support.BeanDefinitionBuilder) + */ + override def doParse(element: Element, parserContext: ParserContext, builder: BeanDefinitionBuilder) { + val actorForConf = parseActorFor(element) + actorForConf.setAsProperties(builder) + } + + /* + * @see org.springframework.beans.factory.xml.AbstractSingleBeanDefinitionParser#getBeanClass(org.w3c.dom.Element) + */ + override def getBeanClass(element: Element): Class[_] = classOf[ActorForFactoryBean] +} diff --git a/akka-spring/src/main/scala/ActorFactoryBean.scala b/akka-spring/src/main/scala/ActorFactoryBean.scala index 11d5274a70..c47efcdb78 100644 --- a/akka-spring/src/main/scala/ActorFactoryBean.scala +++ b/akka-spring/src/main/scala/ActorFactoryBean.scala @@ -4,22 +4,19 @@ package se.scalablesolutions.akka.spring -import java.beans.PropertyDescriptor -import java.lang.reflect.Method -import javax.annotation.PreDestroy -import javax.annotation.PostConstruct - import org.springframework.beans.{BeanUtils,BeansException,BeanWrapper,BeanWrapperImpl} -import org.springframework.beans.factory.BeanFactory +import se.scalablesolutions.akka.remote.{RemoteClient, RemoteServer} +//import org.springframework.beans.factory.BeanFactory import org.springframework.beans.factory.config.AbstractFactoryBean import org.springframework.context.{ApplicationContext,ApplicationContextAware} -import org.springframework.util.ReflectionUtils +//import org.springframework.util.ReflectionUtils import org.springframework.util.StringUtils import se.scalablesolutions.akka.actor.{ActorRef, AspectInitRegistry, TypedActorConfiguration, TypedActor,Actor} import se.scalablesolutions.akka.dispatch.MessageDispatcher import se.scalablesolutions.akka.util.{Logging, Duration} import scala.reflect.BeanProperty +import java.net.InetSocketAddress /** * Exception to use when something goes wrong during bean creation. @@ -49,6 +46,8 @@ class ActorFactoryBean extends AbstractFactoryBean[AnyRef] with Logging with App @BeanProperty var transactional: Boolean = false @BeanProperty var host: String = "" @BeanProperty var port: Int = _ + @BeanProperty var serverManaged: Boolean = false + @BeanProperty var serviceName: String = "" @BeanProperty var lifecycle: String = "" @BeanProperty var dispatcher: DispatcherProperties = _ @BeanProperty var scope: String = VAL_SCOPE_SINGLETON @@ -94,7 +93,16 @@ class ActorFactoryBean extends AbstractFactoryBean[AnyRef] with Logging with App if (implementation == null || implementation == "") throw new AkkaBeansException( "The 'implementation' part of the 'akka:typed-actor' element in the Spring config file can't be null or empty string") - TypedActor.newInstance(interface.toClass, implementation.toClass, createConfig) + val typedActor: AnyRef = TypedActor.newInstance(interface.toClass, implementation.toClass, createConfig) + if (isRemote && serverManaged) { + val server = RemoteServer.getOrCreateServer(new InetSocketAddress(host, port)) + if (serviceName.isEmpty) { + server.registerTypedActor(interface, typedActor) + } else { + server.registerTypedActor(serviceName, typedActor) + } + } + typedActor } /** @@ -111,7 +119,16 @@ class ActorFactoryBean extends AbstractFactoryBean[AnyRef] with Logging with App actorRef.makeTransactionRequired } if (isRemote) { - actorRef.makeRemote(host, port) + if (serverManaged) { + val server = RemoteServer.getOrCreateServer(new InetSocketAddress(host, port)) + if (serviceName.isEmpty) { + server.register(actorRef) + } else { + server.register(serviceName, actorRef) + } + } else { + actorRef.makeRemote(host, port) + } } if (hasDispatcher) { if (dispatcher.dispatcherType != THREAD_BASED){ @@ -159,7 +176,7 @@ class ActorFactoryBean extends AbstractFactoryBean[AnyRef] with Logging with App private[akka] def createConfig: TypedActorConfiguration = { val config = new TypedActorConfiguration().timeout(Duration(timeout, "millis")) if (transactional) config.makeTransactionRequired - if (isRemote) config.makeRemote(host, port) + if (isRemote && !serverManaged) config.makeRemote(host, port) if (hasDispatcher) { if (dispatcher.dispatcherType != THREAD_BASED) { config.dispatcher(dispatcherInstance()) @@ -191,3 +208,39 @@ class ActorFactoryBean extends AbstractFactoryBean[AnyRef] with Logging with App } } } + +/** + * Factory bean for remote client actor-for. + * + * @author michaelkober + */ +class ActorForFactoryBean extends AbstractFactoryBean[AnyRef] with Logging with ApplicationContextAware { + import StringReflect._ + import AkkaSpringConfigurationTags._ + + @BeanProperty var interface: String = "" + @BeanProperty var host: String = "" + @BeanProperty var port: Int = _ + @BeanProperty var serviceName: String = "" + //@BeanProperty var scope: String = VAL_SCOPE_SINGLETON + @BeanProperty var applicationContext: ApplicationContext = _ + + override def isSingleton = false + + /* + * @see org.springframework.beans.factory.FactoryBean#getObjectType() + */ + def getObjectType: Class[AnyRef] = classOf[AnyRef] + + /* + * @see org.springframework.beans.factory.config.AbstractFactoryBean#createInstance() + */ + def createInstance: AnyRef = { + if (interface.isEmpty) { + RemoteClient.actorFor(serviceName, host, port) + } else { + RemoteClient.typedActorFor(interface.toClass, serviceName, host, port) + } + } +} + diff --git a/akka-spring/src/main/scala/ActorParser.scala b/akka-spring/src/main/scala/ActorParser.scala index 69073bd52f..8736b807d1 100644 --- a/akka-spring/src/main/scala/ActorParser.scala +++ b/akka-spring/src/main/scala/ActorParser.scala @@ -6,6 +6,7 @@ package se.scalablesolutions.akka.spring import org.springframework.util.xml.DomUtils import org.w3c.dom.Element import scala.collection.JavaConversions._ +import se.scalablesolutions.akka.util.Logging import se.scalablesolutions.akka.actor.IllegalActorStateException @@ -27,11 +28,17 @@ trait ActorParser extends BeanParser with DispatcherParser { val objectProperties = new ActorProperties() val remoteElement = DomUtils.getChildElementByTagName(element, REMOTE_TAG); val dispatcherElement = DomUtils.getChildElementByTagName(element, DISPATCHER_TAG) - val propertyEntries = DomUtils.getChildElementsByTagName(element,PROPERTYENTRY_TAG) + val propertyEntries = DomUtils.getChildElementsByTagName(element, PROPERTYENTRY_TAG) if (remoteElement != null) { objectProperties.host = mandatory(remoteElement, HOST) objectProperties.port = mandatory(remoteElement, PORT).toInt + objectProperties.serverManaged = (remoteElement.getAttribute(MANAGED_BY) != null) && (remoteElement.getAttribute(MANAGED_BY).equals(SERVER_MANAGED)) + val serviceName = remoteElement.getAttribute(SERVICE_NAME) + if ((serviceName != null) && (!serviceName.isEmpty)) { + objectProperties.serviceName = serviceName + objectProperties.serverManaged = true + } } if (dispatcherElement != null) { @@ -43,7 +50,7 @@ trait ActorParser extends BeanParser with DispatcherParser { val entry = new PropertyEntry entry.name = element.getAttribute("name"); entry.value = element.getAttribute("value") - entry.ref = element.getAttribute("ref") + entry.ref = element.getAttribute("ref") objectProperties.propertyEntries.add(entry) } @@ -59,15 +66,13 @@ trait ActorParser extends BeanParser with DispatcherParser { objectProperties.target = mandatory(element, IMPLEMENTATION) objectProperties.transactional = if (element.getAttribute(TRANSACTIONAL).isEmpty) false else element.getAttribute(TRANSACTIONAL).toBoolean - if (!element.getAttribute(INTERFACE).isEmpty) { + if (element.hasAttribute(INTERFACE)) { objectProperties.interface = element.getAttribute(INTERFACE) } - - if (!element.getAttribute(LIFECYCLE).isEmpty) { + if (element.hasAttribute(LIFECYCLE)) { objectProperties.lifecycle = element.getAttribute(LIFECYCLE) } - - if (!element.getAttribute(SCOPE).isEmpty) { + if (element.hasAttribute(SCOPE)) { objectProperties.scope = element.getAttribute(SCOPE) } @@ -75,3 +80,159 @@ trait ActorParser extends BeanParser with DispatcherParser { } } + +/** + * Parser trait for custom namespace configuration for RemoteClient actor-for. + * @author michaelkober + */ +trait ActorForParser extends BeanParser { + import AkkaSpringConfigurationTags._ + + /** + * Parses the given element and returns a ActorForProperties. + * @param element dom element to parse + * @return configuration for the typed actor + */ + def parseActorFor(element: Element): ActorForProperties = { + val objectProperties = new ActorForProperties() + + objectProperties.host = mandatory(element, HOST) + objectProperties.port = mandatory(element, PORT).toInt + objectProperties.serviceName = mandatory(element, SERVICE_NAME) + if (element.hasAttribute(INTERFACE)) { + objectProperties.interface = element.getAttribute(INTERFACE) + } + objectProperties + } + +} + +/** + * Base trait with utility methods for bean parsing. + */ +trait BeanParser extends Logging { + + /** + * Get a mandatory element attribute. + * @param element the element with the mandatory attribute + * @param attribute name of the mandatory attribute + */ + def mandatory(element: Element, attribute: String): String = { + if ((element.getAttribute(attribute) == null) || (element.getAttribute(attribute).isEmpty)) { + throw new IllegalArgumentException("Mandatory attribute missing: " + attribute) + } else { + element.getAttribute(attribute) + } + } + + /** + * Get a mandatory child element. + * @param element the parent element + * @param childName name of the mandatory child element + */ + def mandatoryElement(element: Element, childName: String): Element = { + val childElement = DomUtils.getChildElementByTagName(element, childName); + if (childElement == null) { + throw new IllegalArgumentException("Mandatory element missing: ''") + } else { + childElement + } + } + +} + + +/** + * Parser trait for custom namespace for Akka dispatcher configuration. + * @author michaelkober + */ +trait DispatcherParser extends BeanParser { + import AkkaSpringConfigurationTags._ + + /** + * Parses the given element and returns a DispatcherProperties. + * @param element dom element to parse + * @return configuration for the dispatcher + */ + def parseDispatcher(element: Element): DispatcherProperties = { + val properties = new DispatcherProperties() + var dispatcherElement = element + if (hasRef(element)) { + val ref = element.getAttribute(REF) + dispatcherElement = element.getOwnerDocument.getElementById(ref) + if (dispatcherElement == null) { + throw new IllegalArgumentException("Referenced dispatcher not found: '" + ref + "'") + } + } + + properties.dispatcherType = mandatory(dispatcherElement, TYPE) + if (properties.dispatcherType == THREAD_BASED) { + val allowedParentNodes = "akka:typed-actor" :: "akka:untyped-actor" :: "typed-actor" :: "untyped-actor" :: Nil + if (!allowedParentNodes.contains(dispatcherElement.getParentNode.getNodeName)) { + throw new IllegalArgumentException("Thread based dispatcher must be nested in 'typed-actor' or 'untyped-actor' element!") + } + } + + if (properties.dispatcherType == HAWT) { // no name for HawtDispatcher + properties.name = dispatcherElement.getAttribute(NAME) + if (dispatcherElement.hasAttribute(AGGREGATE)) { + properties.aggregate = dispatcherElement.getAttribute(AGGREGATE).toBoolean + } + } else { + properties.name = mandatory(dispatcherElement, NAME) + } + + val threadPoolElement = DomUtils.getChildElementByTagName(dispatcherElement, THREAD_POOL_TAG); + if (threadPoolElement != null) { + if (properties.dispatcherType == REACTOR_BASED_SINGLE_THREAD_EVENT_DRIVEN || + properties.dispatcherType == THREAD_BASED) { + throw new IllegalArgumentException("Element 'thread-pool' not allowed for this dispatcher type.") + } + val threadPoolProperties = parseThreadPool(threadPoolElement) + properties.threadPool = threadPoolProperties + } + properties + } + + /** + * Parses the given element and returns a ThreadPoolProperties. + * @param element dom element to parse + * @return configuration for the thread pool + */ + def parseThreadPool(element: Element): ThreadPoolProperties = { + val properties = new ThreadPoolProperties() + properties.queue = element.getAttribute(QUEUE) + if (element.hasAttribute(CAPACITY)) { + properties.capacity = element.getAttribute(CAPACITY).toInt + } + if (element.hasAttribute(BOUND)) { + properties.bound = element.getAttribute(BOUND).toInt + } + if (element.hasAttribute(FAIRNESS)) { + properties.fairness = element.getAttribute(FAIRNESS).toBoolean + } + if (element.hasAttribute(CORE_POOL_SIZE)) { + properties.corePoolSize = element.getAttribute(CORE_POOL_SIZE).toInt + } + if (element.hasAttribute(MAX_POOL_SIZE)) { + properties.maxPoolSize = element.getAttribute(MAX_POOL_SIZE).toInt + } + if (element.hasAttribute(KEEP_ALIVE)) { + properties.keepAlive = element.getAttribute(KEEP_ALIVE).toLong + } + if (element.hasAttribute(REJECTION_POLICY)) { + properties.rejectionPolicy = element.getAttribute(REJECTION_POLICY) + } + if (element.hasAttribute(MAILBOX_CAPACITY)) { + properties.mailboxCapacity = element.getAttribute(MAILBOX_CAPACITY).toInt + } + properties + } + + def hasRef(element: Element): Boolean = { + val ref = element.getAttribute(REF) + (ref != null) && !ref.isEmpty + } + +} + diff --git a/akka-spring/src/main/scala/ActorProperties.scala b/akka-spring/src/main/scala/ActorProperties.scala index 15c7e61fe0..0f86942935 100644 --- a/akka-spring/src/main/scala/ActorProperties.scala +++ b/akka-spring/src/main/scala/ActorProperties.scala @@ -8,7 +8,7 @@ import org.springframework.beans.factory.support.BeanDefinitionBuilder import AkkaSpringConfigurationTags._ /** - * Data container for typed actor configuration data. + * Data container for actor configuration data. * @author michaelkober * @author Martin Krasser */ @@ -20,6 +20,8 @@ class ActorProperties { var transactional: Boolean = false var host: String = "" var port: Int = _ + var serverManaged: Boolean = false + var serviceName: String = "" var lifecycle: String = "" var scope:String = VAL_SCOPE_SINGLETON var dispatcher: DispatcherProperties = _ @@ -34,6 +36,8 @@ class ActorProperties { builder.addPropertyValue("typed", typed) builder.addPropertyValue(HOST, host) builder.addPropertyValue(PORT, port) + builder.addPropertyValue("serverManaged", serverManaged) + builder.addPropertyValue("serviceName", serviceName) builder.addPropertyValue(TIMEOUT, timeout) builder.addPropertyValue(IMPLEMENTATION, target) builder.addPropertyValue(INTERFACE, interface) @@ -45,3 +49,26 @@ class ActorProperties { } } + +/** + * Data container for actor configuration data. + * @author michaelkober + */ +class ActorForProperties { + var interface: String = "" + var host: String = "" + var port: Int = _ + var serviceName: String = "" + + /** + * Sets the properties to the given builder. + * @param builder bean definition builder + */ + def setAsProperties(builder: BeanDefinitionBuilder) { + builder.addPropertyValue(HOST, host) + builder.addPropertyValue(PORT, port) + builder.addPropertyValue("serviceName", serviceName) + builder.addPropertyValue(INTERFACE, interface) + } + +} diff --git a/akka-spring/src/main/scala/AkkaNamespaceHandler.scala b/akka-spring/src/main/scala/AkkaNamespaceHandler.scala index a478b7b262..b1c58baa20 100644 --- a/akka-spring/src/main/scala/AkkaNamespaceHandler.scala +++ b/akka-spring/src/main/scala/AkkaNamespaceHandler.scala @@ -12,10 +12,11 @@ import AkkaSpringConfigurationTags._ */ class AkkaNamespaceHandler extends NamespaceHandlerSupport { def init = { - registerBeanDefinitionParser(TYPED_ACTOR_TAG, new TypedActorBeanDefinitionParser()); - registerBeanDefinitionParser(UNTYPED_ACTOR_TAG, new UntypedActorBeanDefinitionParser()); - registerBeanDefinitionParser(SUPERVISION_TAG, new SupervisionBeanDefinitionParser()); - registerBeanDefinitionParser(DISPATCHER_TAG, new DispatcherBeanDefinitionParser()); - registerBeanDefinitionParser(CAMEL_SERVICE_TAG, new CamelServiceBeanDefinitionParser); + registerBeanDefinitionParser(TYPED_ACTOR_TAG, new TypedActorBeanDefinitionParser()) + registerBeanDefinitionParser(UNTYPED_ACTOR_TAG, new UntypedActorBeanDefinitionParser()) + registerBeanDefinitionParser(SUPERVISION_TAG, new SupervisionBeanDefinitionParser()) + registerBeanDefinitionParser(DISPATCHER_TAG, new DispatcherBeanDefinitionParser()) + registerBeanDefinitionParser(CAMEL_SERVICE_TAG, new CamelServiceBeanDefinitionParser) + registerBeanDefinitionParser(ACTOR_FOR_TAG, new ActorForBeanDefinitionParser()); } } diff --git a/akka-spring/src/main/scala/AkkaSpringConfigurationTags.scala b/akka-spring/src/main/scala/AkkaSpringConfigurationTags.scala index 2743d772da..2d9807a806 100644 --- a/akka-spring/src/main/scala/AkkaSpringConfigurationTags.scala +++ b/akka-spring/src/main/scala/AkkaSpringConfigurationTags.scala @@ -19,6 +19,7 @@ object AkkaSpringConfigurationTags { val DISPATCHER_TAG = "dispatcher" val PROPERTYENTRY_TAG = "property" val CAMEL_SERVICE_TAG = "camel-service" + val ACTOR_FOR_TAG = "actor-for" // actor sub tags val REMOTE_TAG = "remote" @@ -45,6 +46,8 @@ object AkkaSpringConfigurationTags { val TRANSACTIONAL = "transactional" val HOST = "host" val PORT = "port" + val MANAGED_BY = "managed-by" + val SERVICE_NAME = "service-name" val LIFECYCLE = "lifecycle" val SCOPE = "scope" @@ -103,4 +106,8 @@ object AkkaSpringConfigurationTags { val THREAD_BASED = "thread-based" val HAWT = "hawt" + // managed by types + val SERVER_MANAGED = "server" + val CLIENT_MANAGED = "client" + } diff --git a/akka-spring/src/main/scala/BeanParser.scala b/akka-spring/src/main/scala/BeanParser.scala deleted file mode 100644 index 1bbba9f09f..0000000000 --- a/akka-spring/src/main/scala/BeanParser.scala +++ /dev/null @@ -1,42 +0,0 @@ -/** - * Copyright (C) 2009-2010 Scalable Solutions AB - */ -package se.scalablesolutions.akka.spring - -import se.scalablesolutions.akka.util.Logging -import org.w3c.dom.Element -import org.springframework.util.xml.DomUtils - -/** - * Base trait with utility methods for bean parsing. - */ -trait BeanParser extends Logging { - - /** - * Get a mandatory element attribute. - * @param element the element with the mandatory attribute - * @param attribute name of the mandatory attribute - */ - def mandatory(element: Element, attribute: String): String = { - if ((element.getAttribute(attribute) == null) || (element.getAttribute(attribute).isEmpty)) { - throw new IllegalArgumentException("Mandatory attribute missing: " + attribute) - } else { - element.getAttribute(attribute) - } - } - - /** - * Get a mandatory child element. - * @param element the parent element - * @param childName name of the mandatory child element - */ - def mandatoryElement(element: Element, childName: String): Element = { - val childElement = DomUtils.getChildElementByTagName(element, childName); - if (childElement == null) { - throw new IllegalArgumentException("Mandatory element missing: ''") - } else { - childElement - } - } - -} diff --git a/akka-spring/src/main/scala/DispatcherParser.scala b/akka-spring/src/main/scala/DispatcherParser.scala deleted file mode 100644 index e9f10e1328..0000000000 --- a/akka-spring/src/main/scala/DispatcherParser.scala +++ /dev/null @@ -1,101 +0,0 @@ -/** - * Copyright (C) 2009-2010 Scalable Solutions AB - */ -package se.scalablesolutions.akka.spring - -import org.w3c.dom.Element -import org.springframework.util.xml.DomUtils - -/** - * Parser trait for custom namespace for Akka dispatcher configuration. - * @author michaelkober - */ -trait DispatcherParser extends BeanParser { - import AkkaSpringConfigurationTags._ - - /** - * Parses the given element and returns a DispatcherProperties. - * @param element dom element to parse - * @return configuration for the dispatcher - */ - def parseDispatcher(element: Element): DispatcherProperties = { - val properties = new DispatcherProperties() - var dispatcherElement = element - if (hasRef(element)) { - val ref = element.getAttribute(REF) - dispatcherElement = element.getOwnerDocument.getElementById(ref) - if (dispatcherElement == null) { - throw new IllegalArgumentException("Referenced dispatcher not found: '" + ref + "'") - } - } - - properties.dispatcherType = mandatory(dispatcherElement, TYPE) - if (properties.dispatcherType == THREAD_BASED) { - val allowedParentNodes = "akka:typed-actor" :: "akka:untyped-actor" :: "typed-actor" :: "untyped-actor" :: Nil - if (!allowedParentNodes.contains(dispatcherElement.getParentNode.getNodeName)) { - throw new IllegalArgumentException("Thread based dispatcher must be nested in 'typed-actor' or 'untyped-actor' element!") - } - } - - if (properties.dispatcherType == HAWT) { // no name for HawtDispatcher - properties.name = dispatcherElement.getAttribute(NAME) - if (dispatcherElement.hasAttribute(AGGREGATE)) { - properties.aggregate = dispatcherElement.getAttribute(AGGREGATE).toBoolean - } - } else { - properties.name = mandatory(dispatcherElement, NAME) - } - - val threadPoolElement = DomUtils.getChildElementByTagName(dispatcherElement, THREAD_POOL_TAG); - if (threadPoolElement != null) { - if (properties.dispatcherType == REACTOR_BASED_SINGLE_THREAD_EVENT_DRIVEN || - properties.dispatcherType == THREAD_BASED) { - throw new IllegalArgumentException("Element 'thread-pool' not allowed for this dispatcher type.") - } - val threadPoolProperties = parseThreadPool(threadPoolElement) - properties.threadPool = threadPoolProperties - } - properties - } - - /** - * Parses the given element and returns a ThreadPoolProperties. - * @param element dom element to parse - * @return configuration for the thread pool - */ - def parseThreadPool(element: Element): ThreadPoolProperties = { - val properties = new ThreadPoolProperties() - properties.queue = element.getAttribute(QUEUE) - if (element.hasAttribute(CAPACITY)) { - properties.capacity = element.getAttribute(CAPACITY).toInt - } - if (element.hasAttribute(BOUND)) { - properties.bound = element.getAttribute(BOUND).toInt - } - if (element.hasAttribute(FAIRNESS)) { - properties.fairness = element.getAttribute(FAIRNESS).toBoolean - } - if (element.hasAttribute(CORE_POOL_SIZE)) { - properties.corePoolSize = element.getAttribute(CORE_POOL_SIZE).toInt - } - if (element.hasAttribute(MAX_POOL_SIZE)) { - properties.maxPoolSize = element.getAttribute(MAX_POOL_SIZE).toInt - } - if (element.hasAttribute(KEEP_ALIVE)) { - properties.keepAlive = element.getAttribute(KEEP_ALIVE).toLong - } - if (element.hasAttribute(REJECTION_POLICY)) { - properties.rejectionPolicy = element.getAttribute(REJECTION_POLICY) - } - if (element.hasAttribute(MAILBOX_CAPACITY)) { - properties.mailboxCapacity = element.getAttribute(MAILBOX_CAPACITY).toInt - } - properties - } - - def hasRef(element: Element): Boolean = { - val ref = element.getAttribute(REF) - (ref != null) && !ref.isEmpty - } - -} diff --git a/akka-spring/src/main/scala/PropertyEntries.scala b/akka-spring/src/main/scala/PropertyEntries.scala index bf1898a805..9a7dc098de 100644 --- a/akka-spring/src/main/scala/PropertyEntries.scala +++ b/akka-spring/src/main/scala/PropertyEntries.scala @@ -18,3 +18,19 @@ class PropertyEntries { entryList.append(entry) } } + +/** + * Represents a property element + * @author Johan Rask + */ +class PropertyEntry { + var name: String = _ + var value: String = null + var ref: String = null + + + override def toString(): String = { + format("name = %s,value = %s, ref = %s", name, value, ref) + } +} + diff --git a/akka-spring/src/main/scala/PropertyEntry.scala b/akka-spring/src/main/scala/PropertyEntry.scala deleted file mode 100644 index 9fe6357fc0..0000000000 --- a/akka-spring/src/main/scala/PropertyEntry.scala +++ /dev/null @@ -1,19 +0,0 @@ -/** - * Copyright (C) 2009-2010 Scalable Solutions AB - */ -package se.scalablesolutions.akka.spring - -/** - * Represents a property element - * @author Johan Rask - */ -class PropertyEntry { - var name: String = _ - var value: String = null - var ref: String = null - - - override def toString(): String = { - format("name = %s,value = %s, ref = %s", name, value, ref) - } -} diff --git a/akka-spring/src/main/scala/TypedActorBeanDefinitionParser.scala b/akka-spring/src/main/scala/TypedActorBeanDefinitionParser.scala deleted file mode 100644 index e8e0cef7d4..0000000000 --- a/akka-spring/src/main/scala/TypedActorBeanDefinitionParser.scala +++ /dev/null @@ -1,31 +0,0 @@ -/** - * Copyright (C) 2009-2010 Scalable Solutions AB - */ -package se.scalablesolutions.akka.spring - -import org.springframework.beans.factory.support.BeanDefinitionBuilder -import org.springframework.beans.factory.xml.AbstractSingleBeanDefinitionParser -import org.springframework.beans.factory.xml.ParserContext -import AkkaSpringConfigurationTags._ -import org.w3c.dom.Element - - -/** - * Parser for custom namespace configuration. - * @author michaelkober - */ -class TypedActorBeanDefinitionParser extends AbstractSingleBeanDefinitionParser with ActorParser { - /* - * @see org.springframework.beans.factory.xml.AbstractSingleBeanDefinitionParser#doParse(org.w3c.dom.Element, org.springframework.beans.factory.xml.ParserContext, org.springframework.beans.factory.support.BeanDefinitionBuilder) - */ - override def doParse(element: Element, parserContext: ParserContext, builder: BeanDefinitionBuilder) { - val typedActorConf = parseActor(element) - typedActorConf.typed = TYPED_ACTOR_TAG - typedActorConf.setAsProperties(builder) - } - - /* - * @see org.springframework.beans.factory.xml.AbstractSingleBeanDefinitionParser#getBeanClass(org.w3c.dom.Element) - */ - override def getBeanClass(element: Element): Class[_] = classOf[ActorFactoryBean] -} diff --git a/akka-spring/src/main/scala/UntypedActorBeanDefinitionParser.scala b/akka-spring/src/main/scala/UntypedActorBeanDefinitionParser.scala deleted file mode 100644 index 752e18559f..0000000000 --- a/akka-spring/src/main/scala/UntypedActorBeanDefinitionParser.scala +++ /dev/null @@ -1,31 +0,0 @@ -/** - * Copyright (C) 2009-2010 Scalable Solutions AB - */ -package se.scalablesolutions.akka.spring - -import org.springframework.beans.factory.support.BeanDefinitionBuilder -import org.springframework.beans.factory.xml.AbstractSingleBeanDefinitionParser -import org.springframework.beans.factory.xml.ParserContext -import AkkaSpringConfigurationTags._ -import org.w3c.dom.Element - - -/** - * Parser for custom namespace configuration. - * @author michaelkober - */ -class UntypedActorBeanDefinitionParser extends AbstractSingleBeanDefinitionParser with ActorParser { - /* - * @see org.springframework.beans.factory.xml.AbstractSingleBeanDefinitionParser#doParse(org.w3c.dom.Element, org.springframework.beans.factory.xml.ParserContext, org.springframework.beans.factory.support.BeanDefinitionBuilder) - */ - override def doParse(element: Element, parserContext: ParserContext, builder: BeanDefinitionBuilder) { - val untypedActorConf = parseActor(element) - untypedActorConf.typed = UNTYPED_ACTOR_TAG - untypedActorConf.setAsProperties(builder) - } - - /* - * @see org.springframework.beans.factory.xml.AbstractSingleBeanDefinitionParser#getBeanClass(org.w3c.dom.Element) - */ - override def getBeanClass(element: Element): Class[_] = classOf[ActorFactoryBean] -} diff --git a/akka-spring/src/test/java/se/scalablesolutions/akka/spring/foo/IMyPojo.java b/akka-spring/src/test/java/se/scalablesolutions/akka/spring/foo/IMyPojo.java index f2c5e24884..5a2a272e6c 100644 --- a/akka-spring/src/test/java/se/scalablesolutions/akka/spring/foo/IMyPojo.java +++ b/akka-spring/src/test/java/se/scalablesolutions/akka/spring/foo/IMyPojo.java @@ -8,14 +8,12 @@ package se.scalablesolutions.akka.spring.foo; * To change this template use File | Settings | File Templates. */ public interface IMyPojo { + public void oneWay(String message); + public String getFoo(); - public String getBar(); - - public void preRestart(); - - public void postRestart(); - public String longRunning(); + + } diff --git a/akka-spring/src/test/java/se/scalablesolutions/akka/spring/foo/MyPojo.java b/akka-spring/src/test/java/se/scalablesolutions/akka/spring/foo/MyPojo.java index fe3e9ba767..8f610eef63 100644 --- a/akka-spring/src/test/java/se/scalablesolutions/akka/spring/foo/MyPojo.java +++ b/akka-spring/src/test/java/se/scalablesolutions/akka/spring/foo/MyPojo.java @@ -1,42 +1,34 @@ package se.scalablesolutions.akka.spring.foo; -import se.scalablesolutions.akka.actor.*; +import se.scalablesolutions.akka.actor.TypedActor; -public class MyPojo extends TypedActor implements IMyPojo{ +import java.util.concurrent.CountDownLatch; - private String foo; - private String bar; +public class MyPojo extends TypedActor implements IMyPojo { + + public static CountDownLatch latch = new CountDownLatch(1); + public static String lastOneWayMessage = null; + private String foo = "foo"; - public MyPojo() { - this.foo = "foo"; - this.bar = "bar"; - } + public MyPojo() { + } + public String getFoo() { + return foo; + } - public String getFoo() { - return foo; - } + public void oneWay(String message) { + lastOneWayMessage = message; + latch.countDown(); + } - - public String getBar() { - return bar; - } - - public void preRestart() { - System.out.println("pre restart"); - } - - public void postRestart() { - System.out.println("post restart"); - } - - public String longRunning() { - try { - Thread.sleep(6000); - } catch (InterruptedException e) { - } - return "this took long"; + public String longRunning() { + try { + Thread.sleep(6000); + } catch (InterruptedException e) { } + return "this took long"; + } } diff --git a/akka-spring/src/test/java/se/scalablesolutions/akka/spring/foo/PingActor.java b/akka-spring/src/test/java/se/scalablesolutions/akka/spring/foo/PingActor.java index e447b26a28..3063a1b529 100644 --- a/akka-spring/src/test/java/se/scalablesolutions/akka/spring/foo/PingActor.java +++ b/akka-spring/src/test/java/se/scalablesolutions/akka/spring/foo/PingActor.java @@ -6,6 +6,8 @@ import se.scalablesolutions.akka.actor.ActorRef; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; +import java.util.concurrent.CountDownLatch; + /** * test class @@ -14,6 +16,9 @@ public class PingActor extends UntypedActor implements ApplicationContextAware { private String stringFromVal; private String stringFromRef; + public static String lastMessage = null; + public static CountDownLatch latch = new CountDownLatch(1); + private boolean gotApplicationContext = false; @@ -42,7 +47,6 @@ public class PingActor extends UntypedActor implements ApplicationContextAware { stringFromRef = s; } - private String longRunning() { try { Thread.sleep(6000); @@ -53,12 +57,12 @@ public class PingActor extends UntypedActor implements ApplicationContextAware { public void onReceive(Object message) throws Exception { if (message instanceof String) { - System.out.println("Ping received String message: " + message); + lastMessage = (String) message; if (message.equals("longRunning")) { - System.out.println("### starting pong"); ActorRef pongActor = UntypedActor.actorOf(PongActor.class).start(); pongActor.sendRequestReply("longRunning", getContext()); } + latch.countDown(); } else { throw new IllegalArgumentException("Unknown message: " + message); } diff --git a/akka-spring/src/test/resources/server-managed-config.xml b/akka-spring/src/test/resources/server-managed-config.xml new file mode 100644 index 0000000000..128b16c8b6 --- /dev/null +++ b/akka-spring/src/test/resources/server-managed-config.xml @@ -0,0 +1,57 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/akka-spring/src/test/resources/typed-actor-config.xml b/akka-spring/src/test/resources/typed-actor-config.xml index faca749469..989884e4fa 100644 --- a/akka-spring/src/test/resources/typed-actor-config.xml +++ b/akka-spring/src/test/resources/typed-actor-config.xml @@ -37,7 +37,7 @@ http://scalablesolutions.se/akka/akka-1.0-SNAPSHOT.xsd"> implementation="se.scalablesolutions.akka.spring.foo.MyPojo" timeout="2000" transactional="true"> - + - + + + + val props = parser.parseActor(dom(xml).getDocumentElement); + assert(props != null) + assert(props.host === "com.some.host") + assert(props.port === 9999) + assert(props.serviceName === "my-service") + assert(props.serverManaged) } } } diff --git a/akka-spring/src/test/scala/TypedActorSpringFeatureTest.scala b/akka-spring/src/test/scala/TypedActorSpringFeatureTest.scala index 8767b2e75a..3cdcd17cb0 100644 --- a/akka-spring/src/test/scala/TypedActorSpringFeatureTest.scala +++ b/akka-spring/src/test/scala/TypedActorSpringFeatureTest.scala @@ -4,10 +4,8 @@ package se.scalablesolutions.akka.spring -import foo.{IMyPojo, MyPojo} +import foo.{PingActor, IMyPojo, MyPojo} import se.scalablesolutions.akka.dispatch.FutureTimeoutException -import se.scalablesolutions.akka.remote.RemoteNode -import org.scalatest.FeatureSpec import org.scalatest.matchers.ShouldMatchers import org.scalatest.junit.JUnitRunner import org.junit.runner.RunWith @@ -16,13 +14,52 @@ import org.springframework.beans.factory.xml.XmlBeanDefinitionReader import org.springframework.context.ApplicationContext import org.springframework.context.support.ClassPathXmlApplicationContext import org.springframework.core.io.{ClassPathResource, Resource} +import org.scalatest.{BeforeAndAfterAll, FeatureSpec} +import se.scalablesolutions.akka.remote.{RemoteClient, RemoteServer, RemoteNode} +import java.util.concurrent.CountDownLatch +import se.scalablesolutions.akka.actor.{TypedActor, RemoteTypedActorOne, Actor} +import se.scalablesolutions.akka.actor.remote.RemoteTypedActorOneImpl /** * Tests for spring configuration of typed actors. * @author michaelkober */ @RunWith(classOf[JUnitRunner]) -class TypedActorSpringFeatureTest extends FeatureSpec with ShouldMatchers { +class TypedActorSpringFeatureTest extends FeatureSpec with ShouldMatchers with BeforeAndAfterAll { + + var server1: RemoteServer = null + var server2: RemoteServer = null + + override def beforeAll = { + val actor = Actor.actorOf[PingActor] // FIXME: remove this line when ticket 425 is fixed + server1 = new RemoteServer() + server1.start("localhost", 9990) + server2 = new RemoteServer() + server2.start("localhost", 9992) + + val typedActor = TypedActor.newInstance(classOf[RemoteTypedActorOne], classOf[RemoteTypedActorOneImpl], 1000) + server1.registerTypedActor("typed-actor-service", typedActor) + } + + // make sure the servers shutdown cleanly after the test has finished + override def afterAll = { + try { + server1.shutdown + server2.shutdown + RemoteClient.shutdownAll + Thread.sleep(1000) + } catch { + case e => () + } + } + + def getTypedActorFromContext(config: String, id: String) : IMyPojo = { + MyPojo.latch = new CountDownLatch(1) + val context = new ClassPathXmlApplicationContext(config) + val myPojo: IMyPojo = context.getBean(id).asInstanceOf[IMyPojo] + myPojo + } + feature("parse Spring application context") { scenario("akka:typed-actor and akka:supervision and akka:dispatcher can be used as top level elements") { @@ -37,41 +74,79 @@ class TypedActorSpringFeatureTest extends FeatureSpec with ShouldMatchers { } scenario("get a typed actor") { - val context = new ClassPathXmlApplicationContext("/typed-actor-config.xml") - val myPojo = context.getBean("simple-typed-actor").asInstanceOf[IMyPojo] - var msg = myPojo.getFoo() - msg += myPojo.getBar() - assert(msg === "foobar") + val myPojo = getTypedActorFromContext("/typed-actor-config.xml", "simple-typed-actor") + assert(myPojo.getFoo() === "foo") + myPojo.oneWay("hello 1") + MyPojo.latch.await + assert(MyPojo.lastOneWayMessage === "hello 1") } scenario("FutureTimeoutException when timed out") { - val context = new ClassPathXmlApplicationContext("/typed-actor-config.xml") - val myPojo = context.getBean("simple-typed-actor").asInstanceOf[IMyPojo] + val myPojo = getTypedActorFromContext("/typed-actor-config.xml", "simple-typed-actor") evaluating {myPojo.longRunning()} should produce[FutureTimeoutException] - } scenario("typed-actor with timeout") { - val context = new ClassPathXmlApplicationContext("/typed-actor-config.xml") - val myPojo = context.getBean("simple-typed-actor-long-timeout").asInstanceOf[IMyPojo] + val myPojo = getTypedActorFromContext("/typed-actor-config.xml", "simple-typed-actor-long-timeout") assert(myPojo.longRunning() === "this took long"); } scenario("transactional typed-actor") { - val context = new ClassPathXmlApplicationContext("/typed-actor-config.xml") - val myPojo = context.getBean("transactional-typed-actor").asInstanceOf[IMyPojo] - var msg = myPojo.getFoo() - msg += myPojo.getBar() - assert(msg === "foobar") + val myPojo = getTypedActorFromContext("/typed-actor-config.xml", "transactional-typed-actor") + assert(myPojo.getFoo() === "foo") + myPojo.oneWay("hello 2") + MyPojo.latch.await + assert(MyPojo.lastOneWayMessage === "hello 2") } scenario("get a remote typed-actor") { - RemoteNode.start - Thread.sleep(1000) - val context = new ClassPathXmlApplicationContext("/typed-actor-config.xml") - val myPojo = context.getBean("remote-typed-actor").asInstanceOf[IMyPojo] - assert(myPojo.getFoo === "foo") + val myPojo = getTypedActorFromContext("/typed-actor-config.xml", "remote-typed-actor") + assert(myPojo.getFoo() === "foo") + myPojo.oneWay("hello 3") + MyPojo.latch.await + assert(MyPojo.lastOneWayMessage === "hello 3") } + + scenario("get a client-managed-remote-typed-actor") { + val myPojo = getTypedActorFromContext("/server-managed-config.xml", "client-managed-remote-typed-actor") + assert(myPojo.getFoo() === "foo") + myPojo.oneWay("hello client-managed-remote-typed-actor") + MyPojo.latch.await + assert(MyPojo.lastOneWayMessage === "hello client-managed-remote-typed-actor") + } + + scenario("get a server-managed-remote-typed-actor") { + val serverPojo = getTypedActorFromContext("/server-managed-config.xml", "server-managed-remote-typed-actor") + // + val myPojoProxy = RemoteClient.typedActorFor(classOf[IMyPojo], classOf[IMyPojo].getName, 5000L, "localhost", 9990) + assert(myPojoProxy.getFoo() === "foo") + myPojoProxy.oneWay("hello server-managed-remote-typed-actor") + MyPojo.latch.await + assert(MyPojo.lastOneWayMessage === "hello server-managed-remote-typed-actor") + } + + scenario("get a server-managed-remote-typed-actor-custom-id") { + val serverPojo = getTypedActorFromContext("/server-managed-config.xml", "server-managed-remote-typed-actor-custom-id") + // + val myPojoProxy = RemoteClient.typedActorFor(classOf[IMyPojo], "mypojo-service", 5000L, "localhost", 9990) + assert(myPojoProxy.getFoo() === "foo") + myPojoProxy.oneWay("hello server-managed-remote-typed-actor 2") + MyPojo.latch.await + assert(MyPojo.lastOneWayMessage === "hello server-managed-remote-typed-actor 2") + } + + scenario("get a client proxy for server-managed-remote-typed-actor") { + MyPojo.latch = new CountDownLatch(1) + val context = new ClassPathXmlApplicationContext("/server-managed-config.xml") + val myPojo: IMyPojo = context.getBean("server-managed-remote-typed-actor-custom-id").asInstanceOf[IMyPojo] + // get client proxy from spring context + val myPojoProxy = context.getBean("typed-client-1").asInstanceOf[IMyPojo] + assert(myPojoProxy.getFoo() === "foo") + myPojoProxy.oneWay("hello") + MyPojo.latch.await + } + + } } diff --git a/akka-spring/src/test/scala/UntypedActorSpringFeatureTest.scala b/akka-spring/src/test/scala/UntypedActorSpringFeatureTest.scala index cf7d8d9805..0397d30bf0 100644 --- a/akka-spring/src/test/scala/UntypedActorSpringFeatureTest.scala +++ b/akka-spring/src/test/scala/UntypedActorSpringFeatureTest.scala @@ -6,74 +6,146 @@ package se.scalablesolutions.akka.spring import foo.PingActor import se.scalablesolutions.akka.dispatch.ExecutorBasedEventDrivenWorkStealingDispatcher -import se.scalablesolutions.akka.remote.RemoteNode -import se.scalablesolutions.akka.actor.ActorRef -import org.scalatest.FeatureSpec import org.scalatest.matchers.ShouldMatchers import org.scalatest.junit.JUnitRunner import org.junit.runner.RunWith -import org.springframework.context.ApplicationContext import org.springframework.context.support.ClassPathXmlApplicationContext +import se.scalablesolutions.akka.remote.{RemoteClient, RemoteServer} +import org.scalatest.{BeforeAndAfterAll, FeatureSpec} +import java.util.concurrent.CountDownLatch +import se.scalablesolutions.akka.actor.{RemoteActorRef, ActorRegistry, Actor, ActorRef} /** * Tests for spring configuration of typed actors. * @author michaelkober */ @RunWith(classOf[JUnitRunner]) -class UntypedActorSpringFeatureTest extends FeatureSpec with ShouldMatchers { +class UntypedActorSpringFeatureTest extends FeatureSpec with ShouldMatchers with BeforeAndAfterAll { + + var server1: RemoteServer = null + var server2: RemoteServer = null + + + override def beforeAll = { + val actor = Actor.actorOf[PingActor] // FIXME: remove this line when ticket 425 is fixed + server1 = new RemoteServer() + server1.start("localhost", 9990) + server2 = new RemoteServer() + server2.start("localhost", 9992) + } + + // make sure the servers shutdown cleanly after the test has finished + override def afterAll = { + try { + server1.shutdown + server2.shutdown + RemoteClient.shutdownAll + Thread.sleep(1000) + } catch { + case e => () + } + } + + + def getPingActorFromContext(config: String, id: String) : ActorRef = { + PingActor.latch = new CountDownLatch(1) + val context = new ClassPathXmlApplicationContext(config) + val pingActor = context.getBean(id).asInstanceOf[ActorRef] + assert(pingActor.getActorClassName() === "se.scalablesolutions.akka.spring.foo.PingActor") + pingActor.start() + } + + feature("parse Spring application context") { scenario("get a untyped actor") { - val context = new ClassPathXmlApplicationContext("/untyped-actor-config.xml") - val myactor = context.getBean("simple-untyped-actor").asInstanceOf[ActorRef] - assert(myactor.getActorClassName() === "se.scalablesolutions.akka.spring.foo.PingActor") - myactor.start() + val myactor = getPingActorFromContext("/untyped-actor-config.xml", "simple-untyped-actor") myactor.sendOneWay("Hello") + PingActor.latch.await + assert(PingActor.lastMessage === "Hello") assert(myactor.isDefinedAt("some string message")) } scenario("untyped-actor with timeout") { - val context = new ClassPathXmlApplicationContext("/untyped-actor-config.xml") - val myactor = context.getBean("simple-untyped-actor-long-timeout").asInstanceOf[ActorRef] - assert(myactor.getActorClassName() === "se.scalablesolutions.akka.spring.foo.PingActor") - myactor.start() - myactor.sendOneWay("Hello") + val myactor = getPingActorFromContext("/untyped-actor-config.xml", "simple-untyped-actor-long-timeout") assert(myactor.getTimeout() === 10000) + myactor.sendOneWay("Hello 2") + PingActor.latch.await + assert(PingActor.lastMessage === "Hello 2") } scenario("transactional untyped-actor") { - val context = new ClassPathXmlApplicationContext("/untyped-actor-config.xml") - val myactor = context.getBean("transactional-untyped-actor").asInstanceOf[ActorRef] - assert(myactor.getActorClassName() === "se.scalablesolutions.akka.spring.foo.PingActor") - myactor.start() - myactor.sendOneWay("Hello") - assert(myactor.isDefinedAt("some string message")) + val myactor = getPingActorFromContext("/untyped-actor-config.xml", "transactional-untyped-actor") + myactor.sendOneWay("Hello 3") + PingActor.latch.await + assert(PingActor.lastMessage === "Hello 3") } scenario("get a remote typed-actor") { - RemoteNode.start - Thread.sleep(1000) - val context = new ClassPathXmlApplicationContext("/untyped-actor-config.xml") - val myactor = context.getBean("remote-untyped-actor").asInstanceOf[ActorRef] - assert(myactor.getActorClassName() === "se.scalablesolutions.akka.spring.foo.PingActor") - myactor.start() - myactor.sendOneWay("Hello") - assert(myactor.isDefinedAt("some string message")) + val myactor = getPingActorFromContext("/untyped-actor-config.xml", "remote-untyped-actor") + myactor.sendOneWay("Hello 4") assert(myactor.getRemoteAddress().isDefined) assert(myactor.getRemoteAddress().get.getHostName() === "localhost") - assert(myactor.getRemoteAddress().get.getPort() === 9999) + assert(myactor.getRemoteAddress().get.getPort() === 9992) + PingActor.latch.await + assert(PingActor.lastMessage === "Hello 4") } scenario("untyped-actor with custom dispatcher") { - val context = new ClassPathXmlApplicationContext("/untyped-actor-config.xml") - val myactor = context.getBean("untyped-actor-with-dispatcher").asInstanceOf[ActorRef] - assert(myactor.getActorClassName() === "se.scalablesolutions.akka.spring.foo.PingActor") - myactor.start() - myactor.sendOneWay("Hello") + val myactor = getPingActorFromContext("/untyped-actor-config.xml", "untyped-actor-with-dispatcher") assert(myactor.getTimeout() === 1000) assert(myactor.getDispatcher.isInstanceOf[ExecutorBasedEventDrivenWorkStealingDispatcher]) + myactor.sendOneWay("Hello 5") + PingActor.latch.await + assert(PingActor.lastMessage === "Hello 5") } + + scenario("create client managed remote untyped-actor") { + val myactor = getPingActorFromContext("/server-managed-config.xml", "client-managed-remote-untyped-actor") + myactor.sendOneWay("Hello client managed remote untyped-actor") + PingActor.latch.await + assert(PingActor.lastMessage === "Hello client managed remote untyped-actor") + assert(myactor.getRemoteAddress().isDefined) + assert(myactor.getRemoteAddress().get.getHostName() === "localhost") + assert(myactor.getRemoteAddress().get.getPort() === 9990) + } + + scenario("create server managed remote untyped-actor") { + val myactor = getPingActorFromContext("/server-managed-config.xml", "server-managed-remote-untyped-actor") + val nrOfActors = ActorRegistry.actors.length + val actorRef = RemoteClient.actorFor("se.scalablesolutions.akka.spring.foo.PingActor", "localhost", 9990) + actorRef.sendOneWay("Hello server managed remote untyped-actor") + PingActor.latch.await + assert(PingActor.lastMessage === "Hello server managed remote untyped-actor") + assert(ActorRegistry.actors.length === nrOfActors) + } + + scenario("create server managed remote untyped-actor with custom service id") { + val myactor = getPingActorFromContext("/server-managed-config.xml", "server-managed-remote-untyped-actor-custom-id") + val nrOfActors = ActorRegistry.actors.length + val actorRef = RemoteClient.actorFor("ping-service", "localhost", 9990) + actorRef.sendOneWay("Hello server managed remote untyped-actor") + PingActor.latch.await + assert(PingActor.lastMessage === "Hello server managed remote untyped-actor") + assert(ActorRegistry.actors.length === nrOfActors) + } + + scenario("get client actor for server managed remote untyped-actor") { + PingActor.latch = new CountDownLatch(1) + val context = new ClassPathXmlApplicationContext("/server-managed-config.xml") + val pingActor = context.getBean("server-managed-remote-untyped-actor-custom-id").asInstanceOf[ActorRef] + assert(pingActor.getActorClassName() === "se.scalablesolutions.akka.spring.foo.PingActor") + pingActor.start() + val nrOfActors = ActorRegistry.actors.length + // get client actor ref from spring context + val actorRef = context.getBean("client-1").asInstanceOf[ActorRef] + assert(actorRef.isInstanceOf[RemoteActorRef]) + actorRef.sendOneWay("Hello") + PingActor.latch.await + assert(ActorRegistry.actors.length === nrOfActors) + } + } } diff --git a/akka-typed-actor/src/main/scala/actor/TypedActor.scala b/akka-typed-actor/src/main/scala/actor/TypedActor.scala index 385c1831a4..7d393070ec 100644 --- a/akka-typed-actor/src/main/scala/actor/TypedActor.scala +++ b/akka-typed-actor/src/main/scala/actor/TypedActor.scala @@ -389,7 +389,8 @@ object TypedActor extends Logging { typedActor.initialize(proxy) if (config._messageDispatcher.isDefined) actorRef.dispatcher = config._messageDispatcher.get if (config._threadBasedDispatcher.isDefined) actorRef.dispatcher = Dispatchers.newThreadBasedDispatcher(actorRef) - AspectInitRegistry.register(proxy, AspectInit(intfClass, typedActor, actorRef, None, config.timeout)) + if (config._host.isDefined) actorRef.makeRemote(config._host.get) + AspectInitRegistry.register(proxy, AspectInit(intfClass, typedActor, actorRef, config._host, config.timeout)) actorRef.start proxy.asInstanceOf[T] } diff --git a/akka-typed-actor/src/test/resources/META-INF/aop.xml b/akka-typed-actor/src/test/resources/META-INF/aop.xml index bdc167ca54..be133a51b8 100644 --- a/akka-typed-actor/src/test/resources/META-INF/aop.xml +++ b/akka-typed-actor/src/test/resources/META-INF/aop.xml @@ -2,6 +2,7 @@ + From 5a1e8f52351dc027a855446a31287761e26d46ba Mon Sep 17 00:00:00 2001 From: Michael Kober Date: Mon, 13 Sep 2010 13:31:42 +0200 Subject: [PATCH 6/8] closing ticket #426 --- .../src/main/scala/actor/ActorRef.scala | 4 +- .../src/main/scala/remote/RemoteServer.scala | 49 +++++++++++++++---- .../serialization/SerializationProtocol.scala | 4 +- .../ClientInitiatedRemoteActorSpec.scala | 3 +- .../scala/remote/RemoteTypedActorSpec.scala | 10 ++-- .../ServerInitiatedRemoteActorSpec.scala | 21 +++++++- .../config/TypedActorGuiceConfigurator.scala | 1 - 7 files changed, 72 insertions(+), 20 deletions(-) diff --git a/akka-actor/src/main/scala/actor/ActorRef.scala b/akka-actor/src/main/scala/actor/ActorRef.scala index e78edba96a..8267a93a54 100644 --- a/akka-actor/src/main/scala/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/actor/ActorRef.scala @@ -1358,7 +1358,7 @@ object RemoteActorSystemMessage { * @author Jonas Bonér */ private[akka] case class RemoteActorRef private[akka] ( - uuuid: String, + classOrServiceName: String, val className: String, val hostname: String, val port: Int, @@ -1369,7 +1369,7 @@ private[akka] case class RemoteActorRef private[akka] ( ensureRemotingEnabled - _uuid = uuuid + id = classOrServiceName timeout = _timeout start diff --git a/akka-remote/src/main/scala/remote/RemoteServer.scala b/akka-remote/src/main/scala/remote/RemoteServer.scala index fa57bda71b..9c5d21b895 100644 --- a/akka-remote/src/main/scala/remote/RemoteServer.scala +++ b/akka-remote/src/main/scala/remote/RemoteServer.scala @@ -292,7 +292,7 @@ class RemoteServer extends Logging with ListenerManagement { /** * Register Remote Actor by the Actor's 'id' field. It starts the Actor if it is not started already. */ - def register(actorRef: ActorRef): Unit = register(actorRef.id,actorRef) + def register(actorRef: ActorRef): Unit = register(actorRef.id, actorRef) /** * Register Remote Actor by a specific 'id' passed as argument. @@ -555,6 +555,32 @@ class RemoteServerHandler( } } + /** + * Find a registered actor by ID (default) or UUID. + * Actors are registered by id apart from registering during serialization see SerializationProtocol. + */ + private def findActorByIdOrUuid(id: String, uuid: String) : ActorRef = { + val registeredActors = server.actors() + var actorRefOrNull = registeredActors get id + if (actorRefOrNull eq null) { + actorRefOrNull = registeredActors get uuid + } + actorRefOrNull + } + + /** + * Find a registered typed actor by ID (default) or UUID. + * Actors are registered by id apart from registering during serialization see SerializationProtocol. + */ + private def findTypedActorByIdOrUUid(id: String, uuid: String) : AnyRef = { + val registeredActors = server.typedActors() + var actorRefOrNull = registeredActors get id + if (actorRefOrNull eq null) { + actorRefOrNull = registeredActors get uuid + } + actorRefOrNull + } + /** * Creates a new instance of the actor with name, uuid and timeout specified as arguments. * @@ -563,12 +589,14 @@ class RemoteServerHandler( * Does not start the actor. */ private def createActor(actorInfo: ActorInfoProtocol): ActorRef = { - val uuid = actorInfo.getUuid + val ids = actorInfo.getUuid.split(':') + val uuid = ids(0) + val id = ids(1) + val name = actorInfo.getTarget val timeout = actorInfo.getTimeout - val registeredActors = server.actors() - val actorRefOrNull = registeredActors get uuid + val actorRefOrNull = findActorByIdOrUuid(id, uuid) if (actorRefOrNull eq null) { try { @@ -577,9 +605,10 @@ class RemoteServerHandler( else Class.forName(name) val actorRef = Actor.actorOf(clazz.newInstance.asInstanceOf[Actor]) actorRef.uuid = uuid + actorRef.id = id actorRef.timeout = timeout actorRef.remoteAddress = None - registeredActors.put(uuid, actorRef) + server.actors.put(id, actorRef) // register by id actorRef } catch { case e => @@ -591,9 +620,11 @@ class RemoteServerHandler( } private def createTypedActor(actorInfo: ActorInfoProtocol): AnyRef = { - val uuid = actorInfo.getUuid - val registeredTypedActors = server.typedActors() - val typedActorOrNull = registeredTypedActors get uuid + val ids = actorInfo.getUuid.split(':') + val uuid = ids(0) + val id = ids(1) + + val typedActorOrNull = findTypedActorByIdOrUUid(id, uuid) if (typedActorOrNull eq null) { val typedActorInfo = actorInfo.getTypedActorInfo @@ -610,7 +641,7 @@ class RemoteServerHandler( val newInstance = TypedActor.newInstance( interfaceClass, targetClass.asInstanceOf[Class[_ <: TypedActor]], actorInfo.getTimeout).asInstanceOf[AnyRef] - registeredTypedActors.put(uuid, newInstance) + server.typedActors.put(id, newInstance) // register by id newInstance } catch { case e => diff --git a/akka-remote/src/main/scala/serialization/SerializationProtocol.scala b/akka-remote/src/main/scala/serialization/SerializationProtocol.scala index 7da001edab..afebae8f3b 100644 --- a/akka-remote/src/main/scala/serialization/SerializationProtocol.scala +++ b/akka-remote/src/main/scala/serialization/SerializationProtocol.scala @@ -230,7 +230,7 @@ object RemoteActorSerialization { } RemoteActorRefProtocol.newBuilder - .setUuid(uuid) + .setUuid(uuid + ":" + id) .setActorClassname(actorClass.getName) .setHomeAddress(AddressProtocol.newBuilder.setHostname(host).setPort(port).build) .setTimeout(timeout) @@ -248,7 +248,7 @@ object RemoteActorSerialization { import actorRef._ val actorInfoBuilder = ActorInfoProtocol.newBuilder - .setUuid(uuid) + .setUuid(uuid + ":" + actorRef.id) .setTarget(actorClassName) .setTimeout(timeout) diff --git a/akka-remote/src/test/scala/remote/ClientInitiatedRemoteActorSpec.scala b/akka-remote/src/test/scala/remote/ClientInitiatedRemoteActorSpec.scala index e03259e573..6670722b02 100644 --- a/akka-remote/src/test/scala/remote/ClientInitiatedRemoteActorSpec.scala +++ b/akka-remote/src/test/scala/remote/ClientInitiatedRemoteActorSpec.scala @@ -93,6 +93,7 @@ class ClientInitiatedRemoteActorSpec extends JUnitSuite { actor.stop } + @Test def shouldSendOneWayAndReceiveReply = { val actor = actorOf[SendOneWayAndReplyReceiverActor] @@ -134,6 +135,6 @@ class ClientInitiatedRemoteActorSpec extends JUnitSuite { assert("Expected exception; to test fault-tolerance" === e.getMessage()) } actor.stop - } + } } diff --git a/akka-remote/src/test/scala/remote/RemoteTypedActorSpec.scala b/akka-remote/src/test/scala/remote/RemoteTypedActorSpec.scala index 780828c310..8b28b35f57 100644 --- a/akka-remote/src/test/scala/remote/RemoteTypedActorSpec.scala +++ b/akka-remote/src/test/scala/remote/RemoteTypedActorSpec.scala @@ -4,10 +4,7 @@ package se.scalablesolutions.akka.actor.remote -import org.scalatest.Spec -import org.scalatest.Assertions import org.scalatest.matchers.ShouldMatchers -import org.scalatest.BeforeAndAfterAll import org.scalatest.junit.JUnitRunner import org.junit.runner.RunWith @@ -19,6 +16,7 @@ import se.scalablesolutions.akka.actor._ import se.scalablesolutions.akka.remote.{RemoteServer, RemoteClient} import java.util.concurrent.{LinkedBlockingQueue, TimeUnit, BlockingQueue} +import org.scalatest.{BeforeAndAfterEach, Spec, Assertions, BeforeAndAfterAll} object RemoteTypedActorSpec { val HOSTNAME = "localhost" @@ -40,7 +38,7 @@ object RemoteTypedActorLog { class RemoteTypedActorSpec extends Spec with ShouldMatchers with - BeforeAndAfterAll { + BeforeAndAfterEach with BeforeAndAfterAll { import RemoteTypedActorLog._ import RemoteTypedActorSpec._ @@ -82,6 +80,10 @@ class RemoteTypedActorSpec extends ActorRegistry.shutdownAll } + override def afterEach() { + server.typedActors.clear + } + describe("Remote Typed Actor ") { it("should receive one-way message") { diff --git a/akka-remote/src/test/scala/remote/ServerInitiatedRemoteActorSpec.scala b/akka-remote/src/test/scala/remote/ServerInitiatedRemoteActorSpec.scala index 012d42f92a..2fe8bad905 100644 --- a/akka-remote/src/test/scala/remote/ServerInitiatedRemoteActorSpec.scala +++ b/akka-remote/src/test/scala/remote/ServerInitiatedRemoteActorSpec.scala @@ -79,6 +79,7 @@ class ServerInitiatedRemoteActorSpec extends JUnitSuite { } } + @Test def shouldSendWithBang { val actor = RemoteClient.actorFor( @@ -139,11 +140,29 @@ class ServerInitiatedRemoteActorSpec extends JUnitSuite { server.register(actorOf[RemoteActorSpecActorUnidirectional]) val actor = RemoteClient.actorFor("se.scalablesolutions.akka.actor.remote.ServerInitiatedRemoteActorSpec$RemoteActorSpecActorUnidirectional", HOSTNAME, PORT) val numberOfActorsInRegistry = ActorRegistry.actors.length - val result = actor ! "OneWay" + actor ! "OneWay" assert(RemoteActorSpecActorUnidirectional.latch.await(1, TimeUnit.SECONDS)) assert(numberOfActorsInRegistry === ActorRegistry.actors.length) actor.stop } + @Test + def shouldUseServiceNameAsIdForRemoteActorRef { + server.register(actorOf[RemoteActorSpecActorUnidirectional]) + server.register("my-service", actorOf[RemoteActorSpecActorUnidirectional]) + val actor1 = RemoteClient.actorFor("se.scalablesolutions.akka.actor.remote.ServerInitiatedRemoteActorSpec$RemoteActorSpecActorUnidirectional", HOSTNAME, PORT) + val actor2 = RemoteClient.actorFor("my-service", HOSTNAME, PORT) + val actor3 = RemoteClient.actorFor("my-service", HOSTNAME, PORT) + + actor1 ! "OneWay" + actor2 ! "OneWay" + actor3 ! "OneWay" + + assert(actor1.uuid != actor2.uuid) + assert(actor1.uuid != actor3.uuid) + assert(actor1.id != actor2.id) + assert(actor2.id == actor3.id) + } + } diff --git a/akka-typed-actor/src/main/scala/config/TypedActorGuiceConfigurator.scala b/akka-typed-actor/src/main/scala/config/TypedActorGuiceConfigurator.scala index 339c4d297d..5ca249a3ec 100644 --- a/akka-typed-actor/src/main/scala/config/TypedActorGuiceConfigurator.scala +++ b/akka-typed-actor/src/main/scala/config/TypedActorGuiceConfigurator.scala @@ -122,7 +122,6 @@ private[akka] class TypedActorGuiceConfigurator extends TypedActorConfiguratorBa remoteAddress.foreach { address => actorRef.makeRemote(remoteAddress.get) - RemoteServerModule.registerTypedActor(address, implementationClass.getName, proxy) } AspectInitRegistry.register( From 4d036edc5acec8e4d80356d54c34fa03bf242282 Mon Sep 17 00:00:00 2001 From: Michael Kober Date: Mon, 13 Sep 2010 13:50:31 +0200 Subject: [PATCH 7/8] merged with master --- akka-spring/src/test/scala/UntypedActorSpringFeatureTest.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/akka-spring/src/test/scala/UntypedActorSpringFeatureTest.scala b/akka-spring/src/test/scala/UntypedActorSpringFeatureTest.scala index 67ab6398a6..0397d30bf0 100644 --- a/akka-spring/src/test/scala/UntypedActorSpringFeatureTest.scala +++ b/akka-spring/src/test/scala/UntypedActorSpringFeatureTest.scala @@ -59,7 +59,6 @@ class UntypedActorSpringFeatureTest extends FeatureSpec with ShouldMatchers with feature("parse Spring application context") { -<<<<<<< HEAD scenario("get a untyped actor") { val myactor = getPingActorFromContext("/untyped-actor-config.xml", "simple-untyped-actor") myactor.sendOneWay("Hello") From b5b08e23cfd7972a3ae5bdf2a31ed991a3e37794 Mon Sep 17 00:00:00 2001 From: Michael Kober Date: Mon, 13 Sep 2010 14:04:22 +0200 Subject: [PATCH 8/8] merged with master --- akka-actor/src/main/scala/util/ReflectiveAccess.scala | 3 --- akka-spring/src/main/scala/ActorParser.scala | 3 +-- 2 files changed, 1 insertion(+), 5 deletions(-) diff --git a/akka-actor/src/main/scala/util/ReflectiveAccess.scala b/akka-actor/src/main/scala/util/ReflectiveAccess.scala index 8bfb7f857e..abccd5d9b0 100644 --- a/akka-actor/src/main/scala/util/ReflectiveAccess.scala +++ b/akka-actor/src/main/scala/util/ReflectiveAccess.scala @@ -29,9 +29,6 @@ object ReflectiveAccess { def ensureTypedActorEnabled = TypedActorModule.ensureTypedActorEnabled def ensureJtaEnabled = JtaModule.ensureJtaEnabled - private val noParams = Array[Class[_]]() - private val noArgs = Array[AnyRef]() - /** * Reflective access to the RemoteClient module. * diff --git a/akka-spring/src/main/scala/ActorParser.scala b/akka-spring/src/main/scala/ActorParser.scala index 8736b807d1..9858e9ad7e 100644 --- a/akka-spring/src/main/scala/ActorParser.scala +++ b/akka-spring/src/main/scala/ActorParser.scala @@ -184,8 +184,7 @@ trait DispatcherParser extends BeanParser { val threadPoolElement = DomUtils.getChildElementByTagName(dispatcherElement, THREAD_POOL_TAG); if (threadPoolElement != null) { - if (properties.dispatcherType == REACTOR_BASED_SINGLE_THREAD_EVENT_DRIVEN || - properties.dispatcherType == THREAD_BASED) { + if (properties.dispatcherType == THREAD_BASED) { throw new IllegalArgumentException("Element 'thread-pool' not allowed for this dispatcher type.") } val threadPoolProperties = parseThreadPool(threadPoolElement)