From 67cf3788f35d3d3c5e65b60f89cbeee0dd6e7d3b Mon Sep 17 00:00:00 2001 From: Paul Pacheco Date: Sun, 14 Nov 2010 13:10:13 -0600 Subject: [PATCH 01/10] Added interface for registering session actors, and adding unit test (which is failing now) --- .../src/main/scala/actor/ActorRegistry.scala | 2 + .../src/main/scala/remote/RemoteServer.scala | 33 +++++++++++++ .../ServerInitiatedRemoteActorSpec.scala | 47 +++++++++++++++++++ 3 files changed, 82 insertions(+) diff --git a/akka-actor/src/main/scala/actor/ActorRegistry.scala b/akka-actor/src/main/scala/actor/ActorRegistry.scala index 6751ee9350..0b946df99f 100644 --- a/akka-actor/src/main/scala/actor/ActorRegistry.scala +++ b/akka-actor/src/main/scala/actor/ActorRegistry.scala @@ -301,12 +301,14 @@ object ActorRegistry extends ListenerManagement { private[akka] def actors(address: Address) = actorsFor(address).actors private[akka] def actorsByUuid(address: Address) = actorsFor(address).actorsByUuid + private[akka] def actorsFactories(address: Address) = actorsFor(address).actorsFactories private[akka] def typedActors(address: Address) = actorsFor(address).typedActors private[akka] def typedActorsByUuid(address: Address) = actorsFor(address).typedActorsByUuid private[akka] class RemoteActorSet { private[ActorRegistry] val actors = new ConcurrentHashMap[String, ActorRef] private[ActorRegistry] val actorsByUuid = new ConcurrentHashMap[String, ActorRef] + private[ActorRegistry] val actorsFactories = new ConcurrentHashMap[String, () => ActorRef] private[ActorRegistry] val typedActors = new ConcurrentHashMap[String, AnyRef] private[ActorRegistry] val typedActorsByUuid = new ConcurrentHashMap[String, AnyRef] } diff --git a/akka-remote/src/main/scala/remote/RemoteServer.scala b/akka-remote/src/main/scala/remote/RemoteServer.scala index c85f2913e0..cead8bf9f8 100644 --- a/akka-remote/src/main/scala/remote/RemoteServer.scala +++ b/akka-remote/src/main/scala/remote/RemoteServer.scala @@ -302,6 +302,17 @@ class RemoteServer extends Logging with ListenerManagement { else register(id, actorRef, actors) } + /** + * Register Remote Session Actor by a specific 'id' passed as argument. + *

+ * NOTE: If you use this method to register your remote actor then you must unregister the actor by this ID yourself. + */ + def registerPerSession(id: String, factory: => ActorRef): Unit = synchronized { + log.debug("Registering server side remote session actor with id [%s]", id) + if (id.startsWith(UUID_PREFIX)) register(id.substring(UUID_PREFIX.length), factory, actorsByUuid) + else registerPerSession(id, () => factory, actorsFactories) + } + private def register[Key](id: Key, actorRef: ActorRef, registry: ConcurrentHashMap[Key, ActorRef]) { if (_isRunning && !registry.contains(id)) { if (!actorRef.isRunning) actorRef.start @@ -309,6 +320,12 @@ class RemoteServer extends Logging with ListenerManagement { } } + private def registerPerSession[Key](id: Key, factory: () => ActorRef, registry: ConcurrentHashMap[Key,() => ActorRef]) { + if (_isRunning && !registry.contains(id)) { + registry.put(id, factory) + } + } + private def registerTypedActor[Key](id: Key, typedActor: AnyRef, registry: ConcurrentHashMap[Key, AnyRef]) { if (_isRunning && !registry.contains(id)) registry.put(id, typedActor) } @@ -341,6 +358,18 @@ class RemoteServer extends Logging with ListenerManagement { } } + /** + * Unregister Remote Actor by specific 'id'. + *

+ * NOTE: You need to call this method if you have registered an actor by a custom ID. + */ + def unregisterPerSession(id: String):Unit = synchronized { + if (_isRunning) { + log.info("Unregistering server side remote session actor with id [%s]", id) + actorsFactories.remove(id) + } + } + /** * Unregister Remote Typed Actor by specific 'id'. *

@@ -358,8 +387,10 @@ class RemoteServer extends Logging with ListenerManagement { protected[akka] override def notifyListeners(message: => Any): Unit = super.notifyListeners(message) + private[akka] def actors = ActorRegistry.actors(address) private[akka] def actorsByUuid = ActorRegistry.actorsByUuid(address) + private[akka] def actorsFactories = ActorRegistry.actorsFactories(address) private[akka] def typedActors = ActorRegistry.typedActors(address) private[akka] def typedActorsByUuid = ActorRegistry.typedActorsByUuid(address) } @@ -429,6 +460,8 @@ class RemoteServerHandler( val AW_PROXY_PREFIX = "$$ProxiedByAW".intern val CHANNEL_INIT = "channel-init".intern + val sessionActors = new ChannelLocal(); + applicationLoader.foreach(MessageSerializer.setClassLoader(_)) /** diff --git a/akka-remote/src/test/scala/remote/ServerInitiatedRemoteActorSpec.scala b/akka-remote/src/test/scala/remote/ServerInitiatedRemoteActorSpec.scala index ef60732852..ff8793218e 100644 --- a/akka-remote/src/test/scala/remote/ServerInitiatedRemoteActorSpec.scala +++ b/akka-remote/src/test/scala/remote/ServerInitiatedRemoteActorSpec.scala @@ -36,6 +36,22 @@ object ServerInitiatedRemoteActorSpec { } } + case class Login(user:String); + case class GetUser(); + + class RemoteStatefullSessionActorSpec extends Actor { + + var user : String= _; + + def receive = { + case Login(user) => + this.user = user; + case GetUser() => + self.reply(this.user) + } + } + + object RemoteActorSpecActorAsyncSender { val latch = new CountDownLatch(1) } @@ -63,6 +79,7 @@ class ServerInitiatedRemoteActorSpec extends JUnitSuite { server.register(actorOf[RemoteActorSpecActorUnidirectional]) server.register(actorOf[RemoteActorSpecActorBidirectional]) server.register(actorOf[RemoteActorSpecActorAsyncSender]) + server.registerPerSession("statefull-session-actor", actorOf[RemoteStatefullSessionActorSpec]) Thread.sleep(1000) } @@ -103,6 +120,29 @@ class ServerInitiatedRemoteActorSpec extends JUnitSuite { actor.stop } + @Test + def shouldKeepSessionInformation { + val session1 = RemoteClient.actorFor( + "statefull-session-actor", + 5000L, + HOSTNAME, PORT) + val session2 = RemoteClient.actorFor( + "statefull-session-actor", + 5000L, + HOSTNAME, PORT) + + session1 ! Login("session1"); + session2 ! Login("session2"); + + val result1 = session1 !! GetUser(); + assert("session1" === result1.get.asInstanceOf[String]) + val result2 = session2 !! GetUser(); + assert("session2" === result2.get.asInstanceOf[String]) + + session1.stop() + session2.stop() + } + @Test def shouldSendWithBangAndGetReplyThroughSenderRef { implicit val timeout = 500000000L @@ -205,6 +245,13 @@ class ServerInitiatedRemoteActorSpec extends JUnitSuite { server.unregister("my-service-1") assert(server.actors.get("my-service-1") eq null, "actor unregistered") } + @Test + def shouldRegisterAndUnregisterSession { + server.registerPerSession("my-service-1", actorOf[RemoteActorSpecActorUnidirectional]) + assert(server.actorsFactories.get("my-service-1") ne null, "actor registered") + server.unregisterPerSession("my-service-1") + assert(server.actorsFactories.get("my-service-1") eq null, "actor unregistered") + } @Test def shouldRegisterAndUnregisterByUuid { From 376d1c9dfb7de2c45ad4fbdb6e8c5b6b204e6e77 Mon Sep 17 00:00:00 2001 From: Paul Pacheco Date: Sun, 14 Nov 2010 16:26:33 -0600 Subject: [PATCH 02/10] Added server initiated remote untyped session actors now you can register a factory function and whenever a new session starts, the actor will be created and started. When the client disconnects, the actor will be stopped. The client works the same as any other untyped remote server managed actor. use like this: RemoteServer.registerPerSession("actor-name", actorOf[MyActor]) Unregister like this: RemoteServer.unregisterPerSession("actor-name") --- .../src/main/scala/remote/RemoteServer.scala | 80 +++++++++++----- .../ServerInitiatedRemoteActorSpec.scala | 92 +++++++++++++++++-- 2 files changed, 138 insertions(+), 34 deletions(-) diff --git a/akka-remote/src/main/scala/remote/RemoteServer.scala b/akka-remote/src/main/scala/remote/RemoteServer.scala index 2a92fff853..5ba58567ef 100644 --- a/akka-remote/src/main/scala/remote/RemoteServer.scala +++ b/akka-remote/src/main/scala/remote/RemoteServer.scala @@ -458,7 +458,7 @@ class RemoteServerHandler( val AW_PROXY_PREFIX = "$$ProxiedByAW".intern val CHANNEL_INIT = "channel-init".intern - val sessionActors = new ChannelLocal(); + val sessionActors = new ChannelLocal[Map[String, ActorRef]](); applicationLoader.foreach(MessageSerializer.setClassLoader(_)) @@ -470,6 +470,7 @@ class RemoteServerHandler( override def channelConnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = { val clientAddress = getClientAddress(ctx) + sessionActors.set(event.getChannel(), Map[String, ActorRef]()); log.debug("Remote client [%s] connected to [%s]", clientAddress, server.name) if (RemoteServer.SECURE) { val sslHandler: SslHandler = ctx.getPipeline.get(classOf[SslHandler]) @@ -489,6 +490,11 @@ class RemoteServerHandler( override def channelDisconnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = { val clientAddress = getClientAddress(ctx) log.debug("Remote client [%s] disconnected from [%s]", clientAddress, server.name) + // stop all session actors + for ((id, actorRef) <- sessionActors.get(event.getChannel())) { + actorRef.stop() + } + sessionActors.remove(event.getChannel()); server.notifyListeners(RemoteServerClientDisconnected(server, clientAddress)) } @@ -543,7 +549,7 @@ class RemoteServerHandler( val actorRef = try { - createActor(actorInfo).start + createActor(actorInfo, channel).start } catch { case e: SecurityException => channel.write(createErrorReplyMessage(e, request, AkkaActorType.ScalaActor)) @@ -664,6 +670,13 @@ class RemoteServerHandler( server.actorsByUuid.get(uuid) } + private def findActorFactory(id: String) : () => ActorRef = { + server.actorsFactories.get(id) + } + private def findSessionActor(id: String, channel: Channel) : ActorRef = { + sessionActors.get(channel).getOrElse(id, null) + } + private def findTypedActorById(id: String) : AnyRef = { server.typedActors.get(id) } @@ -693,7 +706,7 @@ class RemoteServerHandler( * * Does not start the actor. */ - private def createActor(actorInfo: ActorInfoProtocol): ActorRef = { + private def createActor(actorInfo: ActorInfoProtocol, channel: Channel): ActorRef = { val uuid = actorInfo.getUuid val id = actorInfo.getId @@ -702,28 +715,47 @@ class RemoteServerHandler( val actorRefOrNull = findActorByIdOrUuid(id, uuidFrom(uuid.getHigh,uuid.getLow).toString) - if (actorRefOrNull eq null) { - try { - if (RemoteServer.UNTRUSTED_MODE) throw new SecurityException( - "Remote server is operating is untrusted mode, can not create remote actors on behalf of the remote client") + if (actorRefOrNull ne null) + return actorRefOrNull - log.info("Creating a new remote actor [%s:%s]", name, uuid) - val clazz = if (applicationLoader.isDefined) applicationLoader.get.loadClass(name) - else Class.forName(name) - val actorRef = Actor.actorOf(clazz.asInstanceOf[Class[_ <: Actor]]) - actorRef.uuid = uuidFrom(uuid.getHigh,uuid.getLow) - actorRef.id = id - actorRef.timeout = timeout - actorRef.remoteAddress = None - server.actorsByUuid.put(actorRef.uuid.toString, actorRef) // register by uuid - actorRef - } catch { - case e => - log.error(e, "Could not create remote actor instance") - server.notifyListeners(RemoteServerError(e, server)) - throw e - } - } else actorRefOrNull + + // the actor has not been registered globally. See if we have it in the session + + val sessionActorRefOrNull = findSessionActor(id, channel); + if (sessionActorRefOrNull ne null) + return sessionActorRefOrNull + + // we dont have it in the session either, see if we have a factory for it + val actorFactoryOrNull = findActorFactory(id) + if (actorFactoryOrNull ne null) { + val actorRef = actorFactoryOrNull(); + actorRef.uuid = uuidFrom(uuid.getHigh,uuid.getLow) + sessionActors.get(channel).put(id, actorRef); + server.actorsByUuid.put(actorRef.uuid.toString, actorRef) // register by uuid + return actorRef + } + + // None of the above, so treat it as a client managed remote actor + try { + if (RemoteServer.UNTRUSTED_MODE) throw new SecurityException( + "Remote server is operating is untrusted mode, can not create remote actors on behalf of the remote client") + + log.info("Creating a new remote actor [%s:%s]", name, uuid) + val clazz = if (applicationLoader.isDefined) applicationLoader.get.loadClass(name) + else Class.forName(name) + val actorRef = Actor.actorOf(clazz.asInstanceOf[Class[_ <: Actor]]) + actorRef.uuid = uuidFrom(uuid.getHigh,uuid.getLow) + actorRef.id = id + actorRef.timeout = timeout + actorRef.remoteAddress = None + server.actorsByUuid.put(actorRef.uuid.toString, actorRef) // register by uuid + actorRef + } catch { + case e => + log.error(e, "Could not create remote actor instance") + server.notifyListeners(RemoteServerError(e, server)) + throw e + } } private def createTypedActor(actorInfo: ActorInfoProtocol): AnyRef = { diff --git a/akka-remote/src/test/scala/remote/ServerInitiatedRemoteActorSpec.scala b/akka-remote/src/test/scala/remote/ServerInitiatedRemoteActorSpec.scala index ff8793218e..f0e2123310 100644 --- a/akka-remote/src/test/scala/remote/ServerInitiatedRemoteActorSpec.scala +++ b/akka-remote/src/test/scala/remote/ServerInitiatedRemoteActorSpec.scala @@ -7,12 +7,14 @@ import org.junit.{Test, Before, After} import akka.remote.{RemoteServer, RemoteClient} import akka.actor.Actor._ import akka.actor.{ActorRegistry, ActorRef, Actor} +import scala.collection.mutable.Set object ServerInitiatedRemoteActorSpec { val HOSTNAME = "localhost" val PORT = 9990 var server: RemoteServer = null + case class Send(actor: ActorRef) object RemoteActorSpecActorUnidirectional { @@ -38,16 +40,29 @@ object ServerInitiatedRemoteActorSpec { case class Login(user:String); case class GetUser(); - + case class DoSomethingWeird(); + + val instantiatedSessionActors= Set[ActorRef](); + class RemoteStatefullSessionActorSpec extends Actor { - var user : String= _; + var user : String= "anonymous"; + override def preStart = { + instantiatedSessionActors += self; + } + + override def postStop = { + instantiatedSessionActors -= self; + } + def receive = { case Login(user) => this.user = user; case GetUser() => self.reply(this.user) + case DoSomethingWeird() => + throw new Exception("Bad boy") } } @@ -122,25 +137,82 @@ class ServerInitiatedRemoteActorSpec extends JUnitSuite { @Test def shouldKeepSessionInformation { + + //RemoteClient.clientFor(HOSTNAME, PORT).connect + val session1 = RemoteClient.actorFor( "statefull-session-actor", 5000L, HOSTNAME, PORT) + + + val default1 = session1 !! GetUser(); + assert("anonymous" === default1.get.asInstanceOf[String]) + + session1 ! Login("session[1]"); + + val result1 = session1 !! GetUser(); + assert("session[1]" === result1.get.asInstanceOf[String]) + + session1.stop() + + RemoteClient.shutdownAll + + //RemoteClient.clientFor(HOSTNAME, PORT).connect + val session2 = RemoteClient.actorFor( "statefull-session-actor", 5000L, HOSTNAME, PORT) - session1 ! Login("session1"); - session2 ! Login("session2"); + // since this is a new session, the server should reset the state + val default2 = session2 !! GetUser(); + assert("anonymous" === default2.get.asInstanceOf[String]) - val result1 = session1 !! GetUser(); - assert("session1" === result1.get.asInstanceOf[String]) - val result2 = session2 !! GetUser(); - assert("session2" === result2.get.asInstanceOf[String]) - - session1.stop() session2.stop() + + RemoteClient.shutdownAll + } + + @Test + def shouldStopActorOnDisconnect{ + + + val session1 = RemoteClient.actorFor( + "statefull-session-actor", + 5000L, + HOSTNAME, PORT) + + + val default1 = session1 !! GetUser(); + assert("anonymous" === default1.get.asInstanceOf[String]) + + assert(instantiatedSessionActors.size == 1); + + RemoteClient.shutdownAll + Thread.sleep(1000) + assert(instantiatedSessionActors.size == 0); + + } + + @Test + def shouldStopActorOnError{ + + + val session1 = RemoteClient.actorFor( + "statefull-session-actor", + 5000L, + HOSTNAME, PORT) + + + session1 ! DoSomethingWeird(); + session1.stop() + + RemoteClient.shutdownAll + Thread.sleep(1000) + + assert(instantiatedSessionActors.size == 0); + } @Test From 16640ebc501f36202326a2e10dc752b07cb436ed Mon Sep 17 00:00:00 2001 From: Paul Pacheco Date: Sun, 14 Nov 2010 18:03:34 -0600 Subject: [PATCH 03/10] Added remote typed session actors, along with unit tests --- .../src/main/scala/actor/ActorRegistry.scala | 2 + .../src/main/scala/remote/RemoteServer.scala | 119 +++++++++++++----- .../akka/actor/RemoteTypedSessionActor.java | 8 ++ .../actor/RemoteTypedSessionActorImpl.java | 49 ++++++++ .../ServerInitiatedRemoteActorSpec.scala | 16 +-- ...InitiatedRemoteTypedSessionActorSpec.scala | 110 ++++++++++++++++ 6 files changed, 268 insertions(+), 36 deletions(-) create mode 100644 akka-remote/src/test/java/akka/actor/RemoteTypedSessionActor.java create mode 100644 akka-remote/src/test/java/akka/actor/RemoteTypedSessionActorImpl.java create mode 100644 akka-remote/src/test/scala/remote/ServerInitiatedRemoteTypedSessionActorSpec.scala diff --git a/akka-actor/src/main/scala/actor/ActorRegistry.scala b/akka-actor/src/main/scala/actor/ActorRegistry.scala index 0b946df99f..f8b46fd3c4 100644 --- a/akka-actor/src/main/scala/actor/ActorRegistry.scala +++ b/akka-actor/src/main/scala/actor/ActorRegistry.scala @@ -304,6 +304,7 @@ object ActorRegistry extends ListenerManagement { private[akka] def actorsFactories(address: Address) = actorsFor(address).actorsFactories private[akka] def typedActors(address: Address) = actorsFor(address).typedActors private[akka] def typedActorsByUuid(address: Address) = actorsFor(address).typedActorsByUuid + private[akka] def typedActorsFactories(address: Address) = actorsFor(address).typedActorsFactories private[akka] class RemoteActorSet { private[ActorRegistry] val actors = new ConcurrentHashMap[String, ActorRef] @@ -311,6 +312,7 @@ object ActorRegistry extends ListenerManagement { private[ActorRegistry] val actorsFactories = new ConcurrentHashMap[String, () => ActorRef] private[ActorRegistry] val typedActors = new ConcurrentHashMap[String, AnyRef] private[ActorRegistry] val typedActorsByUuid = new ConcurrentHashMap[String, AnyRef] + private[ActorRegistry] val typedActorsFactories = new ConcurrentHashMap[String, () => AnyRef] } } diff --git a/akka-remote/src/main/scala/remote/RemoteServer.scala b/akka-remote/src/main/scala/remote/RemoteServer.scala index 5ba58567ef..9afef140b1 100644 --- a/akka-remote/src/main/scala/remote/RemoteServer.scala +++ b/akka-remote/src/main/scala/remote/RemoteServer.scala @@ -284,6 +284,21 @@ class RemoteServer extends Logging with ListenerManagement { else registerTypedActor(id, typedActor, typedActors) } + /** + * Register typed actor by interface name. + */ + def registerTypedPerSessionActor(intfClass: Class[_], factory: => AnyRef) : Unit = registerTypedActor(intfClass.getName, factory) + + /** + * Register remote typed actor by a specific id. + * @param id custom actor id + * @param typedActor typed actor to register + */ + def registerTypedPerSessionActor(id: String, factory: => AnyRef): Unit = synchronized { + log.debug("Registering server side typed remote session actor with id [%s]", id) + registerTypedPerSessionActor(id, () => factory, typedActorsFactories) + } + /** * Register Remote Actor by the Actor's 'id' field. It starts the Actor if it is not started already. */ @@ -307,8 +322,7 @@ class RemoteServer extends Logging with ListenerManagement { */ def registerPerSession(id: String, factory: => ActorRef): Unit = synchronized { log.debug("Registering server side remote session actor with id [%s]", id) - if (id.startsWith(UUID_PREFIX)) register(id.substring(UUID_PREFIX.length), factory, actorsByUuid) - else registerPerSession(id, () => factory, actorsFactories) + registerPerSession(id, () => factory, actorsFactories) } private def register[Key](id: Key, actorRef: ActorRef, registry: ConcurrentHashMap[Key, ActorRef]) { @@ -328,6 +342,12 @@ class RemoteServer extends Logging with ListenerManagement { if (_isRunning && !registry.contains(id)) registry.put(id, typedActor) } + private def registerTypedPerSessionActor[Key](id: Key, factory: () => AnyRef, registry: ConcurrentHashMap[Key,() => AnyRef]) { + if (_isRunning && !registry.contains(id)) { + registry.put(id, factory) + } + } + /** * Unregister Remote Actor that is registered using its 'id' field (not custom ID). */ @@ -381,6 +401,17 @@ class RemoteServer extends Logging with ListenerManagement { } } + /** + * Unregister Remote Typed Actor by specific 'id'. + *

+ * NOTE: You need to call this method if you have registered an actor by a custom ID. + */ + def unregisterTypedPerSessionActor(id: String):Unit = synchronized { + if (_isRunning) { + typedActorsFactories.remove(id) + } + } + protected override def manageLifeCycleOfListeners = false protected[akka] override def notifyListeners(message: => Any): Unit = super.notifyListeners(message) @@ -391,6 +422,7 @@ class RemoteServer extends Logging with ListenerManagement { private[akka] def actorsFactories = ActorRegistry.actorsFactories(address) private[akka] def typedActors = ActorRegistry.typedActors(address) private[akka] def typedActorsByUuid = ActorRegistry.typedActorsByUuid(address) + private[akka] def typedActorsFactories = ActorRegistry.typedActorsFactories(address) } object RemoteServerSslContext { @@ -459,6 +491,7 @@ class RemoteServerHandler( val CHANNEL_INIT = "channel-init".intern val sessionActors = new ChannelLocal[Map[String, ActorRef]](); + val typedSessionActors = new ChannelLocal[Map[String, AnyRef]](); applicationLoader.foreach(MessageSerializer.setClassLoader(_)) @@ -471,6 +504,7 @@ class RemoteServerHandler( override def channelConnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = { val clientAddress = getClientAddress(ctx) sessionActors.set(event.getChannel(), Map[String, ActorRef]()); + typedSessionActors.set(event.getChannel(), Map[String, AnyRef]()); log.debug("Remote client [%s] connected to [%s]", clientAddress, server.name) if (RemoteServer.SECURE) { val sslHandler: SslHandler = ctx.getPipeline.get(classOf[SslHandler]) @@ -495,6 +529,11 @@ class RemoteServerHandler( actorRef.stop() } sessionActors.remove(event.getChannel()); + for ((id, actorRef) <- typedSessionActors.get(event.getChannel())) { + TypedActor.stop(actorRef) + } + typedSessionActors.remove(event.getChannel()); + server.notifyListeners(RemoteServerClientDisconnected(server, clientAddress)) } @@ -623,7 +662,7 @@ class RemoteServerHandler( val typedActorInfo = actorInfo.getTypedActorInfo log.debug("Dispatching to remote typed actor [%s :: %s]", typedActorInfo.getMethod, typedActorInfo.getInterface) - val typedActor = createTypedActor(actorInfo) + val typedActor = createTypedActor(actorInfo, channel) val args = MessageSerializer.deserialize(request.getMessage).asInstanceOf[Array[AnyRef]].toList val argClasses = args.map(_.getClass) @@ -673,6 +712,7 @@ class RemoteServerHandler( private def findActorFactory(id: String) : () => ActorRef = { server.actorsFactories.get(id) } + private def findSessionActor(id: String, channel: Channel) : ActorRef = { sessionActors.get(channel).getOrElse(id, null) } @@ -681,6 +721,14 @@ class RemoteServerHandler( server.typedActors.get(id) } + private def findTypedActorFactory(id: String) : () => AnyRef = { + server.typedActorsFactories.get(id) + } + + private def findTypedSessionActor(id: String, channel: Channel) : AnyRef = { + typedSessionActors.get(channel).getOrElse(id, null) + } + private def findTypedActorByUuid(uuid: String) : AnyRef = { server.typedActorsByUuid.get(uuid) } @@ -731,7 +779,6 @@ class RemoteServerHandler( val actorRef = actorFactoryOrNull(); actorRef.uuid = uuidFrom(uuid.getHigh,uuid.getLow) sessionActors.get(channel).put(id, actorRef); - server.actorsByUuid.put(actorRef.uuid.toString, actorRef) // register by uuid return actorRef } @@ -758,39 +805,55 @@ class RemoteServerHandler( } } - private def createTypedActor(actorInfo: ActorInfoProtocol): AnyRef = { + private def createTypedActor(actorInfo: ActorInfoProtocol, channel: Channel): AnyRef = { val uuid = actorInfo.getUuid val id = actorInfo.getId val typedActorOrNull = findTypedActorByIdOrUuid(id, uuidFrom(uuid.getHigh,uuid.getLow).toString) + if (typedActorOrNull ne null) + return typedActorOrNull; - if (typedActorOrNull eq null) { - val typedActorInfo = actorInfo.getTypedActorInfo - val interfaceClassname = typedActorInfo.getInterface - val targetClassname = actorInfo.getTarget + // the actor has not been registered globally. See if we have it in the session - try { - if (RemoteServer.UNTRUSTED_MODE) throw new SecurityException( - "Remote server is operating is untrusted mode, can not create remote actors on behalf of the remote client") + val sessionActorRefOrNull = findTypedSessionActor(id, channel); + if (sessionActorRefOrNull ne null) + return sessionActorRefOrNull - log.info("Creating a new remote typed actor:\n\t[%s :: %s]", interfaceClassname, targetClassname) + // we dont have it in the session either, see if we have a factory for it + val actorFactoryOrNull = findTypedActorFactory(id) + if (actorFactoryOrNull ne null) { + val newInstance = actorFactoryOrNull(); + typedSessionActors.get(channel).put(id, newInstance); + return newInstance + } - val (interfaceClass, targetClass) = - if (applicationLoader.isDefined) (applicationLoader.get.loadClass(interfaceClassname), - applicationLoader.get.loadClass(targetClassname)) - else (Class.forName(interfaceClassname), Class.forName(targetClassname)) + // None of the above, so treat it as a client managed remote actor - val newInstance = TypedActor.newInstance( - interfaceClass, targetClass.asInstanceOf[Class[_ <: TypedActor]], actorInfo.getTimeout).asInstanceOf[AnyRef] - server.typedActors.put(uuidFrom(uuid.getHigh,uuid.getLow).toString, newInstance) // register by uuid - newInstance - } catch { - case e => - log.error(e, "Could not create remote typed actor instance") - server.notifyListeners(RemoteServerError(e, server)) - throw e - } - } else typedActorOrNull + val typedActorInfo = actorInfo.getTypedActorInfo + val interfaceClassname = typedActorInfo.getInterface + val targetClassname = actorInfo.getTarget + + try { + if (RemoteServer.UNTRUSTED_MODE) throw new SecurityException( + "Remote server is operating is untrusted mode, can not create remote actors on behalf of the remote client") + + log.info("Creating a new remote typed actor:\n\t[%s :: %s]", interfaceClassname, targetClassname) + + val (interfaceClass, targetClass) = + if (applicationLoader.isDefined) (applicationLoader.get.loadClass(interfaceClassname), + applicationLoader.get.loadClass(targetClassname)) + else (Class.forName(interfaceClassname), Class.forName(targetClassname)) + + val newInstance = TypedActor.newInstance( + interfaceClass, targetClass.asInstanceOf[Class[_ <: TypedActor]], actorInfo.getTimeout).asInstanceOf[AnyRef] + server.typedActors.put(uuidFrom(uuid.getHigh,uuid.getLow).toString, newInstance) // register by uuid + newInstance + } catch { + case e => + log.error(e, "Could not create remote typed actor instance") + server.notifyListeners(RemoteServerError(e, server)) + throw e + } } private def createErrorReplyMessage(exception: Throwable, request: RemoteMessageProtocol, actorType: AkkaActorType): RemoteMessageProtocol = { diff --git a/akka-remote/src/test/java/akka/actor/RemoteTypedSessionActor.java b/akka-remote/src/test/java/akka/actor/RemoteTypedSessionActor.java new file mode 100644 index 0000000000..8a6c2e6373 --- /dev/null +++ b/akka-remote/src/test/java/akka/actor/RemoteTypedSessionActor.java @@ -0,0 +1,8 @@ +package akka.actor; + +public interface RemoteTypedSessionActor { + + public void login(String user); + public String getUser(); + public void doSomethingFunny() throws Exception; +} diff --git a/akka-remote/src/test/java/akka/actor/RemoteTypedSessionActorImpl.java b/akka-remote/src/test/java/akka/actor/RemoteTypedSessionActorImpl.java new file mode 100644 index 0000000000..b4140f74ed --- /dev/null +++ b/akka-remote/src/test/java/akka/actor/RemoteTypedSessionActorImpl.java @@ -0,0 +1,49 @@ +package akka.actor.remote; + +import akka.actor.*; + +import java.util.Set; +import java.util.HashSet; + +import java.util.concurrent.CountDownLatch; + +public class RemoteTypedSessionActorImpl extends TypedActor implements RemoteTypedSessionActor { + + + private static Set instantiatedSessionActors = new HashSet(); + + public static Set getInstances() { + return instantiatedSessionActors; + } + + @Override + public void preStart() { + instantiatedSessionActors.add(this); + } + + @Override + public void postStop() { + instantiatedSessionActors.remove(this); + } + + + private String user="anonymous"; + + @Override + public void login(String user) { + this.user = user; + } + + @Override + public String getUser() + { + return this.user; + } + + @Override + public void doSomethingFunny() throws Exception + { + throw new Exception("Bad boy"); + } + +} diff --git a/akka-remote/src/test/scala/remote/ServerInitiatedRemoteActorSpec.scala b/akka-remote/src/test/scala/remote/ServerInitiatedRemoteActorSpec.scala index f0e2123310..5ffbecb612 100644 --- a/akka-remote/src/test/scala/remote/ServerInitiatedRemoteActorSpec.scala +++ b/akka-remote/src/test/scala/remote/ServerInitiatedRemoteActorSpec.scala @@ -40,7 +40,7 @@ object ServerInitiatedRemoteActorSpec { case class Login(user:String); case class GetUser(); - case class DoSomethingWeird(); + case class DoSomethingFunny(); val instantiatedSessionActors= Set[ActorRef](); @@ -61,7 +61,7 @@ object ServerInitiatedRemoteActorSpec { this.user = user; case GetUser() => self.reply(this.user) - case DoSomethingWeird() => + case DoSomethingFunny() => throw new Exception("Bad boy") } } @@ -94,7 +94,7 @@ class ServerInitiatedRemoteActorSpec extends JUnitSuite { server.register(actorOf[RemoteActorSpecActorUnidirectional]) server.register(actorOf[RemoteActorSpecActorBidirectional]) server.register(actorOf[RemoteActorSpecActorAsyncSender]) - server.registerPerSession("statefull-session-actor", actorOf[RemoteStatefullSessionActorSpec]) + server.registerPerSession("untyped-session-actor-service", actorOf[RemoteStatefullSessionActorSpec]) Thread.sleep(1000) } @@ -141,7 +141,7 @@ class ServerInitiatedRemoteActorSpec extends JUnitSuite { //RemoteClient.clientFor(HOSTNAME, PORT).connect val session1 = RemoteClient.actorFor( - "statefull-session-actor", + "untyped-session-actor-service", 5000L, HOSTNAME, PORT) @@ -161,7 +161,7 @@ class ServerInitiatedRemoteActorSpec extends JUnitSuite { //RemoteClient.clientFor(HOSTNAME, PORT).connect val session2 = RemoteClient.actorFor( - "statefull-session-actor", + "untyped-session-actor-service", 5000L, HOSTNAME, PORT) @@ -179,7 +179,7 @@ class ServerInitiatedRemoteActorSpec extends JUnitSuite { val session1 = RemoteClient.actorFor( - "statefull-session-actor", + "untyped-session-actor-service", 5000L, HOSTNAME, PORT) @@ -200,12 +200,12 @@ class ServerInitiatedRemoteActorSpec extends JUnitSuite { val session1 = RemoteClient.actorFor( - "statefull-session-actor", + "untyped-session-actor-service", 5000L, HOSTNAME, PORT) - session1 ! DoSomethingWeird(); + session1 ! DoSomethingFunny(); session1.stop() RemoteClient.shutdownAll diff --git a/akka-remote/src/test/scala/remote/ServerInitiatedRemoteTypedSessionActorSpec.scala b/akka-remote/src/test/scala/remote/ServerInitiatedRemoteTypedSessionActorSpec.scala new file mode 100644 index 0000000000..e766c24d67 --- /dev/null +++ b/akka-remote/src/test/scala/remote/ServerInitiatedRemoteTypedSessionActorSpec.scala @@ -0,0 +1,110 @@ +/** + * Copyright (C) 2009-2010 Scalable Solutions AB + */ + +package akka.actor.remote + +import org.scalatest._ +import org.scalatest.matchers.ShouldMatchers +import org.scalatest.BeforeAndAfterAll +import org.scalatest.junit.JUnitRunner +import org.junit.runner.RunWith + +import java.util.concurrent.TimeUnit + +import akka.remote.{RemoteServer, RemoteClient} +import akka.actor._ +import RemoteTypedActorLog._ + +object ServerInitiatedRemoteTypedSessionActorSpec { + val HOSTNAME = "localhost" + val PORT = 9990 + var server: RemoteServer = null +} + +@RunWith(classOf[JUnitRunner]) +class ServerInitiatedRemoteTypedSessionActorSpec extends + FlatSpec with + ShouldMatchers with + BeforeAndAfterAll { + import ServerInitiatedRemoteTypedActorSpec._ + + private val unit = TimeUnit.MILLISECONDS + + + override def beforeAll = { + server = new RemoteServer() + server.start(HOSTNAME, PORT) + + server.registerTypedPerSessionActor("typed-session-actor-service", + TypedActor.newInstance(classOf[RemoteTypedSessionActor], classOf[RemoteTypedSessionActorImpl], 1000)) + + Thread.sleep(1000) + } + + // make sure the servers shutdown cleanly after the test has finished + override def afterAll = { + try { + server.shutdown + RemoteClient.shutdownAll + Thread.sleep(1000) + } catch { + case e => () + } + } + + "A remote session Actor" should "create a new session actor per connection" in { + clearMessageLogs + + val session1 = RemoteClient.typedActorFor(classOf[RemoteTypedSessionActor], "typed-session-actor-service", 5000L, HOSTNAME, PORT) + + session1.getUser() should equal ("anonymous"); + session1.login("session[1]"); + session1.getUser() should equal ("session[1]"); + + RemoteClient.shutdownAll + + val session2 = RemoteClient.typedActorFor(classOf[RemoteTypedSessionActor], "typed-session-actor-service", 5000L, HOSTNAME, PORT) + + session2.getUser() should equal ("anonymous"); + + RemoteClient.shutdownAll + + } + + it should "stop the actor when the client disconnects" in { + + val session1 = RemoteClient.typedActorFor(classOf[RemoteTypedSessionActor], "typed-session-actor-service", 5000L, HOSTNAME, PORT) + + session1.getUser() should equal ("anonymous"); + + RemoteTypedSessionActorImpl.getInstances() should have size (1); + RemoteClient.shutdownAll + Thread.sleep(1000) + RemoteTypedSessionActorImpl.getInstances() should have size (0); + + } + + it should "stop the actor when there is an error" in { + + val session1 = RemoteClient.typedActorFor(classOf[RemoteTypedSessionActor], "typed-session-actor-service", 5000L, HOSTNAME, PORT) + + session1.doSomethingFunny(); + + RemoteClient.shutdownAll + Thread.sleep(1000) + RemoteTypedSessionActorImpl.getInstances() should have size (0); + + } + + + it should "be able to unregister" in { + server.registerTypedPerSessionActor("my-service-1",TypedActor.newInstance(classOf[RemoteTypedSessionActor], classOf[RemoteTypedSessionActorImpl], 1000)) + + server.typedActorsFactories.get("my-service-1") should not be (null) + server.unregisterTypedPerSessionActor("my-service-1") + server.typedActorsFactories.get("my-service-1") should be (null) + } + +} + From 8c3588570256e7cca9538c7fbda83a3dbdec05cd Mon Sep 17 00:00:00 2001 From: Paul Pacheco Date: Thu, 18 Nov 2010 11:09:55 -0600 Subject: [PATCH 04/10] Cleaned up patch as suggested by Vicktor --- .../src/main/scala/remote/RemoteServer.scala | 46 +++++++++---------- 1 file changed, 23 insertions(+), 23 deletions(-) diff --git a/akka-remote/src/main/scala/remote/RemoteServer.scala b/akka-remote/src/main/scala/remote/RemoteServer.scala index 9afef140b1..4c5442de65 100644 --- a/akka-remote/src/main/scala/remote/RemoteServer.scala +++ b/akka-remote/src/main/scala/remote/RemoteServer.scala @@ -326,25 +326,28 @@ class RemoteServer extends Logging with ListenerManagement { } private def register[Key](id: Key, actorRef: ActorRef, registry: ConcurrentHashMap[Key, ActorRef]) { - if (_isRunning && !registry.contains(id)) { - if (!actorRef.isRunning) actorRef.start - registry.put(id, actorRef) + if (_isRunning) { + if (registry.putIfAbsent(id, actorRef) eq null) { + if (!actorRef.isRunning) actorRef.start + } } } private def registerPerSession[Key](id: Key, factory: () => ActorRef, registry: ConcurrentHashMap[Key,() => ActorRef]) { - if (_isRunning && !registry.contains(id)) { - registry.put(id, factory) + if (_isRunning) { + registry.putIfAbsent(id, factory); } } private def registerTypedActor[Key](id: Key, typedActor: AnyRef, registry: ConcurrentHashMap[Key, AnyRef]) { - if (_isRunning && !registry.contains(id)) registry.put(id, typedActor) + if (_isRunning) { + registry.putIfAbsent(id, typedActor); + } } private def registerTypedPerSessionActor[Key](id: Key, factory: () => AnyRef, registry: ConcurrentHashMap[Key,() => AnyRef]) { - if (_isRunning && !registry.contains(id)) { - registry.put(id, factory) + if (_isRunning) { + registry.putIfAbsent(id, factory); } } @@ -381,7 +384,7 @@ class RemoteServer extends Logging with ListenerManagement { *

* NOTE: You need to call this method if you have registered an actor by a custom ID. */ - def unregisterPerSession(id: String):Unit = synchronized { + def unregisterPerSession(id: String):Unit = { if (_isRunning) { log.info("Unregistering server side remote session actor with id [%s]", id) actorsFactories.remove(id) @@ -406,7 +409,7 @@ class RemoteServer extends Logging with ListenerManagement { *

* NOTE: You need to call this method if you have registered an actor by a custom ID. */ - def unregisterTypedPerSessionActor(id: String):Unit = synchronized { + def unregisterTypedPerSessionActor(id: String):Unit = { if (_isRunning) { typedActorsFactories.remove(id) } @@ -490,8 +493,8 @@ class RemoteServerHandler( val AW_PROXY_PREFIX = "$$ProxiedByAW".intern val CHANNEL_INIT = "channel-init".intern - val sessionActors = new ChannelLocal[Map[String, ActorRef]](); - val typedSessionActors = new ChannelLocal[Map[String, AnyRef]](); + val sessionActors = new ChannelLocal[ConcurrentHashMap[String, ActorRef]](); + val typedSessionActors = new ChannelLocal[ConcurrentHashMap[String, AnyRef]](); applicationLoader.foreach(MessageSerializer.setClassLoader(_)) @@ -503,8 +506,8 @@ class RemoteServerHandler( override def channelConnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = { val clientAddress = getClientAddress(ctx) - sessionActors.set(event.getChannel(), Map[String, ActorRef]()); - typedSessionActors.set(event.getChannel(), Map[String, AnyRef]()); + sessionActors.set(event.getChannel(), new ConcurrentHashMap[String, ActorRef]()); + typedSessionActors.set(event.getChannel(), new ConcurrentHashMap[String, AnyRef]()); log.debug("Remote client [%s] connected to [%s]", clientAddress, server.name) if (RemoteServer.SECURE) { val sslHandler: SslHandler = ctx.getPipeline.get(classOf[SslHandler]) @@ -525,15 +528,13 @@ class RemoteServerHandler( val clientAddress = getClientAddress(ctx) log.debug("Remote client [%s] disconnected from [%s]", clientAddress, server.name) // stop all session actors - for ((id, actorRef) <- sessionActors.get(event.getChannel())) { + for ((id, actorRef) <- sessionActors.remove(event.getChannel())) { actorRef.stop() } - sessionActors.remove(event.getChannel()); - for ((id, actorRef) <- typedSessionActors.get(event.getChannel())) { + for ((id, actorRef) <- typedSessionActors.remove(event.getChannel())) { TypedActor.stop(actorRef) } - typedSessionActors.remove(event.getChannel()); - + server.notifyListeners(RemoteServerClientDisconnected(server, clientAddress)) } @@ -714,7 +715,7 @@ class RemoteServerHandler( } private def findSessionActor(id: String, channel: Channel) : ActorRef = { - sessionActors.get(channel).getOrElse(id, null) + sessionActors.get(channel).get(id) } private def findTypedActorById(id: String) : AnyRef = { @@ -726,7 +727,7 @@ class RemoteServerHandler( } private def findTypedSessionActor(id: String, channel: Channel) : AnyRef = { - typedSessionActors.get(channel).getOrElse(id, null) + typedSessionActors.get(channel).get(id) } private def findTypedActorByUuid(uuid: String) : AnyRef = { @@ -767,12 +768,11 @@ class RemoteServerHandler( return actorRefOrNull - // the actor has not been registered globally. See if we have it in the session - val sessionActorRefOrNull = findSessionActor(id, channel); if (sessionActorRefOrNull ne null) return sessionActorRefOrNull + // we dont have it in the session either, see if we have a factory for it val actorFactoryOrNull = findActorFactory(id) if (actorFactoryOrNull ne null) { From 126ea2caaf0379a93f12f2d8196be7b8a26d7286 Mon Sep 17 00:00:00 2001 From: Paul Pacheco Date: Thu, 18 Nov 2010 12:48:31 -0600 Subject: [PATCH 05/10] refactored the createActor function to make it easier to understand and remove the return statements; --- .../src/main/scala/remote/RemoteServer.scala | 135 ++++++++++++------ 1 file changed, 90 insertions(+), 45 deletions(-) diff --git a/akka-remote/src/main/scala/remote/RemoteServer.scala b/akka-remote/src/main/scala/remote/RemoteServer.scala index 4c5442de65..57fb3d4078 100644 --- a/akka-remote/src/main/scala/remote/RemoteServer.scala +++ b/akka-remote/src/main/scala/remote/RemoteServer.scala @@ -528,11 +528,16 @@ class RemoteServerHandler( val clientAddress = getClientAddress(ctx) log.debug("Remote client [%s] disconnected from [%s]", clientAddress, server.name) // stop all session actors - for ((id, actorRef) <- sessionActors.remove(event.getChannel())) { - actorRef.stop() + val channelActors = sessionActors.remove(event.getChannel()) + val channelActorsIterator = channelActors.elements() + while (channelActorsIterator.hasMoreElements()) { + channelActorsIterator.nextElement().stop() } - for ((id, actorRef) <- typedSessionActors.remove(event.getChannel())) { - TypedActor.stop(actorRef) + + val channelTypedActors = typedSessionActors.remove(event.getChannel()) + val channelTypedActorsIterator = channelTypedActors.elements() + while (channelTypedActorsIterator.hasMoreElements()) { + TypedActor.stop(channelTypedActorsIterator.nextElement()) } server.notifyListeners(RemoteServerClientDisconnected(server, clientAddress)) @@ -749,40 +754,36 @@ class RemoteServerHandler( } /** - * Creates a new instance of the actor with name, uuid and timeout specified as arguments. - * - * If actor already created then just return it from the registry. - * - * Does not start the actor. + * gets the actor from the session, or creates one if there is a factory for it */ - private def createActor(actorInfo: ActorInfoProtocol, channel: Channel): ActorRef = { + private def createSessionActor(actorInfo: ActorInfoProtocol, channel: Channel): ActorRef = { val uuid = actorInfo.getUuid val id = actorInfo.getId - - val name = actorInfo.getTarget - val timeout = actorInfo.getTimeout - - val actorRefOrNull = findActorByIdOrUuid(id, uuidFrom(uuid.getHigh,uuid.getLow).toString) - - if (actorRefOrNull ne null) - return actorRefOrNull - - val sessionActorRefOrNull = findSessionActor(id, channel); if (sessionActorRefOrNull ne null) - return sessionActorRefOrNull - - - // we dont have it in the session either, see if we have a factory for it - val actorFactoryOrNull = findActorFactory(id) - if (actorFactoryOrNull ne null) { - val actorRef = actorFactoryOrNull(); - actorRef.uuid = uuidFrom(uuid.getHigh,uuid.getLow) - sessionActors.get(channel).put(id, actorRef); - return actorRef + sessionActorRefOrNull + else + { + // we dont have it in the session either, see if we have a factory for it + val actorFactoryOrNull = findActorFactory(id) + if (actorFactoryOrNull ne null) { + val actorRef = actorFactoryOrNull(); + actorRef.uuid = uuidFrom(uuid.getHigh,uuid.getLow) + sessionActors.get(channel).put(id, actorRef); + actorRef + } + else + null } + } + + + private def createClientManagedActor(actorInfo: ActorInfoProtocol): ActorRef = { + val uuid = actorInfo.getUuid + val id = actorInfo.getId + val timeout = actorInfo.getTimeout + val name = actorInfo.getTarget - // None of the above, so treat it as a client managed remote actor try { if (RemoteServer.UNTRUSTED_MODE) throw new SecurityException( "Remote server is operating is untrusted mode, can not create remote actors on behalf of the remote client") @@ -803,35 +804,61 @@ class RemoteServerHandler( server.notifyListeners(RemoteServerError(e, server)) throw e } + } - private def createTypedActor(actorInfo: ActorInfoProtocol, channel: Channel): AnyRef = { + /** + * Creates a new instance of the actor with name, uuid and timeout specified as arguments. + * + * If actor already created then just return it from the registry. + * + * Does not start the actor. + */ + private def createActor(actorInfo: ActorInfoProtocol, channel: Channel): ActorRef = { val uuid = actorInfo.getUuid val id = actorInfo.getId - val typedActorOrNull = findTypedActorByIdOrUuid(id, uuidFrom(uuid.getHigh,uuid.getLow).toString) - if (typedActorOrNull ne null) - return typedActorOrNull; + val actorRefOrNull = findActorByIdOrUuid(id, uuidFrom(uuid.getHigh,uuid.getLow).toString) - // the actor has not been registered globally. See if we have it in the session + if (actorRefOrNull ne null) + actorRefOrNull + else + { + // the actor has not been registered globally. See if we have it in the session + val sessionActorRefOrNull = createSessionActor(actorInfo, channel); + if (sessionActorRefOrNull ne null) + sessionActorRefOrNull + else // maybe it is a client managed actor + createClientManagedActor(actorInfo) + } + } + /** + * gets the actor from the session, or creates one if there is a factory for it + */ + private def createTypedSessionActor(actorInfo: ActorInfoProtocol, channel: Channel):AnyRef ={ + val id = actorInfo.getId val sessionActorRefOrNull = findTypedSessionActor(id, channel); if (sessionActorRefOrNull ne null) - return sessionActorRefOrNull - - // we dont have it in the session either, see if we have a factory for it - val actorFactoryOrNull = findTypedActorFactory(id) - if (actorFactoryOrNull ne null) { - val newInstance = actorFactoryOrNull(); - typedSessionActors.get(channel).put(id, newInstance); - return newInstance + sessionActorRefOrNull + else { + val actorFactoryOrNull = findTypedActorFactory(id) + if (actorFactoryOrNull ne null) { + val newInstance = actorFactoryOrNull(); + typedSessionActors.get(channel).put(id, newInstance); + newInstance + } + else + null } - // None of the above, so treat it as a client managed remote actor + } + private def createClientManagedTypedActor(actorInfo: ActorInfoProtocol) = { val typedActorInfo = actorInfo.getTypedActorInfo val interfaceClassname = typedActorInfo.getInterface val targetClassname = actorInfo.getTarget + val uuid = actorInfo.getUuid try { if (RemoteServer.UNTRUSTED_MODE) throw new SecurityException( @@ -856,6 +883,24 @@ class RemoteServerHandler( } } + private def createTypedActor(actorInfo: ActorInfoProtocol, channel: Channel): AnyRef = { + val uuid = actorInfo.getUuid + val id = actorInfo.getId + + val typedActorOrNull = findTypedActorByIdOrUuid(id, uuidFrom(uuid.getHigh,uuid.getLow).toString) + if (typedActorOrNull ne null) + typedActorOrNull + else + { + // the actor has not been registered globally. See if we have it in the session + val sessionActorRefOrNull = createTypedSessionActor(actorInfo, channel) + if (sessionActorRefOrNull ne null) + sessionActorRefOrNull + else // maybe it is a client managed actor + createClientManagedTypedActor(actorInfo) + } + } + private def createErrorReplyMessage(exception: Throwable, request: RemoteMessageProtocol, actorType: AkkaActorType): RemoteMessageProtocol = { val actorInfo = request.getActorInfo log.error(exception, "Could not invoke remote actor [%s]", actorInfo.getTarget) From a605ac4eeadf66115f82d0b0d4a9cfee727246f7 Mon Sep 17 00:00:00 2001 From: Paul Pacheco Date: Thu, 18 Nov 2010 22:38:45 -0600 Subject: [PATCH 06/10] Refatored createActor, separate unit tests cleanup according to viktor's suggestions --- .../src/main/scala/remote/RemoteServer.scala | 13 +- .../ServerInitiatedRemoteActorSpec.scala | 116 ------------- ...erverInitiatedRemoteSessionActorSpec.scala | 162 ++++++++++++++++++ ...InitiatedRemoteTypedSessionActorSpec.scala | 6 +- 4 files changed, 175 insertions(+), 122 deletions(-) create mode 100644 akka-remote/src/test/scala/remote/ServerInitiatedRemoteSessionActorSpec.scala diff --git a/akka-remote/src/main/scala/remote/RemoteServer.scala b/akka-remote/src/main/scala/remote/RemoteServer.scala index 57fb3d4078..7ec32c7c80 100644 --- a/akka-remote/src/main/scala/remote/RemoteServer.scala +++ b/akka-remote/src/main/scala/remote/RemoteServer.scala @@ -820,16 +820,23 @@ class RemoteServerHandler( val actorRefOrNull = findActorByIdOrUuid(id, uuidFrom(uuid.getHigh,uuid.getLow).toString) - if (actorRefOrNull ne null) + if (actorRefOrNull ne null) { + println("Giving actor by id or uuid ") actorRefOrNull + } else { // the actor has not been registered globally. See if we have it in the session val sessionActorRefOrNull = createSessionActor(actorInfo, channel); - if (sessionActorRefOrNull ne null) + if (sessionActorRefOrNull ne null) { + println("giving session actor") sessionActorRefOrNull + } else // maybe it is a client managed actor - createClientManagedActor(actorInfo) + { + println("Client managed actor") + createClientManagedActor(actorInfo) + } } } diff --git a/akka-remote/src/test/scala/remote/ServerInitiatedRemoteActorSpec.scala b/akka-remote/src/test/scala/remote/ServerInitiatedRemoteActorSpec.scala index 5ffbecb612..b0dee8563e 100644 --- a/akka-remote/src/test/scala/remote/ServerInitiatedRemoteActorSpec.scala +++ b/akka-remote/src/test/scala/remote/ServerInitiatedRemoteActorSpec.scala @@ -38,35 +38,6 @@ object ServerInitiatedRemoteActorSpec { } } - case class Login(user:String); - case class GetUser(); - case class DoSomethingFunny(); - - val instantiatedSessionActors= Set[ActorRef](); - - class RemoteStatefullSessionActorSpec extends Actor { - - var user : String= "anonymous"; - - override def preStart = { - instantiatedSessionActors += self; - } - - override def postStop = { - instantiatedSessionActors -= self; - } - - def receive = { - case Login(user) => - this.user = user; - case GetUser() => - self.reply(this.user) - case DoSomethingFunny() => - throw new Exception("Bad boy") - } - } - - object RemoteActorSpecActorAsyncSender { val latch = new CountDownLatch(1) } @@ -94,7 +65,6 @@ class ServerInitiatedRemoteActorSpec extends JUnitSuite { server.register(actorOf[RemoteActorSpecActorUnidirectional]) server.register(actorOf[RemoteActorSpecActorBidirectional]) server.register(actorOf[RemoteActorSpecActorAsyncSender]) - server.registerPerSession("untyped-session-actor-service", actorOf[RemoteStatefullSessionActorSpec]) Thread.sleep(1000) } @@ -135,85 +105,6 @@ class ServerInitiatedRemoteActorSpec extends JUnitSuite { actor.stop } - @Test - def shouldKeepSessionInformation { - - //RemoteClient.clientFor(HOSTNAME, PORT).connect - - val session1 = RemoteClient.actorFor( - "untyped-session-actor-service", - 5000L, - HOSTNAME, PORT) - - - val default1 = session1 !! GetUser(); - assert("anonymous" === default1.get.asInstanceOf[String]) - - session1 ! Login("session[1]"); - - val result1 = session1 !! GetUser(); - assert("session[1]" === result1.get.asInstanceOf[String]) - - session1.stop() - - RemoteClient.shutdownAll - - //RemoteClient.clientFor(HOSTNAME, PORT).connect - - val session2 = RemoteClient.actorFor( - "untyped-session-actor-service", - 5000L, - HOSTNAME, PORT) - - // since this is a new session, the server should reset the state - val default2 = session2 !! GetUser(); - assert("anonymous" === default2.get.asInstanceOf[String]) - - session2.stop() - - RemoteClient.shutdownAll - } - - @Test - def shouldStopActorOnDisconnect{ - - - val session1 = RemoteClient.actorFor( - "untyped-session-actor-service", - 5000L, - HOSTNAME, PORT) - - - val default1 = session1 !! GetUser(); - assert("anonymous" === default1.get.asInstanceOf[String]) - - assert(instantiatedSessionActors.size == 1); - - RemoteClient.shutdownAll - Thread.sleep(1000) - assert(instantiatedSessionActors.size == 0); - - } - - @Test - def shouldStopActorOnError{ - - - val session1 = RemoteClient.actorFor( - "untyped-session-actor-service", - 5000L, - HOSTNAME, PORT) - - - session1 ! DoSomethingFunny(); - session1.stop() - - RemoteClient.shutdownAll - Thread.sleep(1000) - - assert(instantiatedSessionActors.size == 0); - - } @Test def shouldSendWithBangAndGetReplyThroughSenderRef { @@ -317,13 +208,6 @@ class ServerInitiatedRemoteActorSpec extends JUnitSuite { server.unregister("my-service-1") assert(server.actors.get("my-service-1") eq null, "actor unregistered") } - @Test - def shouldRegisterAndUnregisterSession { - server.registerPerSession("my-service-1", actorOf[RemoteActorSpecActorUnidirectional]) - assert(server.actorsFactories.get("my-service-1") ne null, "actor registered") - server.unregisterPerSession("my-service-1") - assert(server.actorsFactories.get("my-service-1") eq null, "actor unregistered") - } @Test def shouldRegisterAndUnregisterByUuid { diff --git a/akka-remote/src/test/scala/remote/ServerInitiatedRemoteSessionActorSpec.scala b/akka-remote/src/test/scala/remote/ServerInitiatedRemoteSessionActorSpec.scala new file mode 100644 index 0000000000..e1f1db4c9e --- /dev/null +++ b/akka-remote/src/test/scala/remote/ServerInitiatedRemoteSessionActorSpec.scala @@ -0,0 +1,162 @@ +/** + * Copyright (C) 2009-2010 Scalable Solutions AB + */ + +package akka.actor.remote + +import org.scalatest._ +import org.scalatest.matchers.ShouldMatchers +import org.scalatest.BeforeAndAfterAll +import org.scalatest.junit.JUnitRunner +import org.junit.runner.RunWith + +import java.util.concurrent.TimeUnit + +import akka.remote.{RemoteServer, RemoteClient} +import akka.actor._ +import RemoteTypedActorLog._ + +object ServerInitiatedRemoteSessionActorSpec { + val HOSTNAME = "localhost" + val PORT = 9990 + var server: RemoteServer = null + + case class Login(user:String); + case class GetUser(); + case class DoSomethingFunny(); + + val instantiatedSessionActors= Set[ActorRef](); + + class RemoteStatefullSessionActorSpec extends Actor { + + var user : String= "anonymous"; + + override def preStart = { + instantiatedSessionActors += self; + } + + override def postStop = { + instantiatedSessionActors -= self; + } + + def receive = { + case Login(user) => + this.user = user; + case GetUser() => + self.reply(this.user) + case DoSomethingFunny() => + throw new Exception("Bad boy") + } + } + +} + +@RunWith(classOf[JUnitRunner]) +class ServerInitiatedRemoteSessionActorSpec extends + FlatSpec with + ShouldMatchers with + BeforeAndAfterEach { + import ServerInitiatedRemoteTypedActorSpec._ + + private val unit = TimeUnit.MILLISECONDS + + + override def beforeEach = { + server = new RemoteServer() + server.start(HOSTNAME, PORT) + + server.registerTypedPerSessionActor("untyped-session-actor-service", actorOf[RemoteStatefullSessionActorSpec]) + + Thread.sleep(1000) + } + + // make sure the servers shutdown cleanly after the test has finished + override def afterEach = { + try { + server.shutdown + RemoteClient.shutdownAll + Thread.sleep(1000) + } catch { + case e => () + } + } + + "A remote session Actor" should "create a new session actor per connection" in { + clearMessageLogs + + val session1 = RemoteClient.actorFor( + "untyped-session-actor-service", + 5000L, + HOSTNAME, PORT) + + val default1 = session1 !! GetUser() + default1.get.asInstanceOf[String] should equal ("anonymous") + session1 ! Login("session[1]") + val result1 = session1 !! GetUser() + result1.get.asInstanceOf[String] should equal ("session[1]") + + session1.stop() + + RemoteClient.shutdownAll + + //RemoteClient.clientFor(HOSTNAME, PORT).connect + + val session2 = RemoteClient.actorFor( + "untyped-session-actor-service", + 5000L, + HOSTNAME, PORT) + + // since this is a new session, the server should reset the state + val default2 = session2 !! GetUser(); + default2.get.asInstanceOf[String] should equal ("anonymous") + + session2.stop() + + } + + it should "stop the actor when the client disconnects" in { + + val session1 = RemoteClient.actorFor( + "untyped-session-actor-service", + 5000L, + HOSTNAME, PORT) + + + val default1 = session1 !! GetUser() + default1.get.asInstanceOf[String] should equal ("anonymous") + + instantiatedSessionActors.size should have size (1) + + RemoteClient.shutdownAll + Thread.sleep(1000) + instantiatedSessionActors.size should have size (0); + + } + + it should "stop the actor when there is an error" in { + + val session1 = RemoteClient.actorFor( + "untyped-session-actor-service", + 5000L, + HOSTNAME, PORT) + + + session1 ! DoSomethingFunny(); + session1.stop() + + RemoteClient.shutdownAll + Thread.sleep(1000) + + instantiatedSessionActors.size should have size (0); + } + + + it should "be able to unregister" in { + server.registerPerSession("my-service-1", actorOf[RemoteActorSpecActorUnidirectional]) + server.actorsFactories.get("my-service-1") should not be (null) + server.unregisterPerSession("my-service-1") + server.actorsFactories.get("my-service-1") should be (null) + } + +} + diff --git a/akka-remote/src/test/scala/remote/ServerInitiatedRemoteTypedSessionActorSpec.scala b/akka-remote/src/test/scala/remote/ServerInitiatedRemoteTypedSessionActorSpec.scala index e766c24d67..575d82ca96 100644 --- a/akka-remote/src/test/scala/remote/ServerInitiatedRemoteTypedSessionActorSpec.scala +++ b/akka-remote/src/test/scala/remote/ServerInitiatedRemoteTypedSessionActorSpec.scala @@ -26,13 +26,13 @@ object ServerInitiatedRemoteTypedSessionActorSpec { class ServerInitiatedRemoteTypedSessionActorSpec extends FlatSpec with ShouldMatchers with - BeforeAndAfterAll { + BeforeAndAfterEach { import ServerInitiatedRemoteTypedActorSpec._ private val unit = TimeUnit.MILLISECONDS - override def beforeAll = { + override def beforeEach = { server = new RemoteServer() server.start(HOSTNAME, PORT) @@ -43,7 +43,7 @@ class ServerInitiatedRemoteTypedSessionActorSpec extends } // make sure the servers shutdown cleanly after the test has finished - override def afterAll = { + override def afterEach = { try { server.shutdown RemoteClient.shutdownAll From 28037666d592556bb0a017d58bce563f4599ad1f Mon Sep 17 00:00:00 2001 From: Paul Pacheco Date: Fri, 19 Nov 2010 11:53:51 -0600 Subject: [PATCH 07/10] Cleaned up some semicolons Test now compiles (but does not pass) --- .../src/main/scala/remote/RemoteServer.scala | 28 ++++++++-------- ...erverInitiatedRemoteSessionActorSpec.scala | 33 ++++++++++--------- ...InitiatedRemoteTypedSessionActorSpec.scala | 18 +++++----- 3 files changed, 40 insertions(+), 39 deletions(-) diff --git a/akka-remote/src/main/scala/remote/RemoteServer.scala b/akka-remote/src/main/scala/remote/RemoteServer.scala index 7ec32c7c80..c129506ad3 100644 --- a/akka-remote/src/main/scala/remote/RemoteServer.scala +++ b/akka-remote/src/main/scala/remote/RemoteServer.scala @@ -335,19 +335,19 @@ class RemoteServer extends Logging with ListenerManagement { private def registerPerSession[Key](id: Key, factory: () => ActorRef, registry: ConcurrentHashMap[Key,() => ActorRef]) { if (_isRunning) { - registry.putIfAbsent(id, factory); + registry.putIfAbsent(id, factory) } } private def registerTypedActor[Key](id: Key, typedActor: AnyRef, registry: ConcurrentHashMap[Key, AnyRef]) { if (_isRunning) { - registry.putIfAbsent(id, typedActor); + registry.putIfAbsent(id, typedActor) } } private def registerTypedPerSessionActor[Key](id: Key, factory: () => AnyRef, registry: ConcurrentHashMap[Key,() => AnyRef]) { if (_isRunning) { - registry.putIfAbsent(id, factory); + registry.putIfAbsent(id, factory) } } @@ -493,8 +493,8 @@ class RemoteServerHandler( val AW_PROXY_PREFIX = "$$ProxiedByAW".intern val CHANNEL_INIT = "channel-init".intern - val sessionActors = new ChannelLocal[ConcurrentHashMap[String, ActorRef]](); - val typedSessionActors = new ChannelLocal[ConcurrentHashMap[String, AnyRef]](); + val sessionActors = new ChannelLocal[ConcurrentHashMap[String, ActorRef]]() + val typedSessionActors = new ChannelLocal[ConcurrentHashMap[String, AnyRef]]() applicationLoader.foreach(MessageSerializer.setClassLoader(_)) @@ -506,8 +506,8 @@ class RemoteServerHandler( override def channelConnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = { val clientAddress = getClientAddress(ctx) - sessionActors.set(event.getChannel(), new ConcurrentHashMap[String, ActorRef]()); - typedSessionActors.set(event.getChannel(), new ConcurrentHashMap[String, AnyRef]()); + sessionActors.set(event.getChannel(), new ConcurrentHashMap[String, ActorRef]()) + typedSessionActors.set(event.getChannel(), new ConcurrentHashMap[String, AnyRef]()) log.debug("Remote client [%s] connected to [%s]", clientAddress, server.name) if (RemoteServer.SECURE) { val sslHandler: SslHandler = ctx.getPipeline.get(classOf[SslHandler]) @@ -759,7 +759,7 @@ class RemoteServerHandler( private def createSessionActor(actorInfo: ActorInfoProtocol, channel: Channel): ActorRef = { val uuid = actorInfo.getUuid val id = actorInfo.getId - val sessionActorRefOrNull = findSessionActor(id, channel); + val sessionActorRefOrNull = findSessionActor(id, channel) if (sessionActorRefOrNull ne null) sessionActorRefOrNull else @@ -767,9 +767,9 @@ class RemoteServerHandler( // we dont have it in the session either, see if we have a factory for it val actorFactoryOrNull = findActorFactory(id) if (actorFactoryOrNull ne null) { - val actorRef = actorFactoryOrNull(); + val actorRef = actorFactoryOrNull() actorRef.uuid = uuidFrom(uuid.getHigh,uuid.getLow) - sessionActors.get(channel).put(id, actorRef); + sessionActors.get(channel).put(id, actorRef) actorRef } else @@ -827,7 +827,7 @@ class RemoteServerHandler( else { // the actor has not been registered globally. See if we have it in the session - val sessionActorRefOrNull = createSessionActor(actorInfo, channel); + val sessionActorRefOrNull = createSessionActor(actorInfo, channel) if (sessionActorRefOrNull ne null) { println("giving session actor") sessionActorRefOrNull @@ -845,14 +845,14 @@ class RemoteServerHandler( */ private def createTypedSessionActor(actorInfo: ActorInfoProtocol, channel: Channel):AnyRef ={ val id = actorInfo.getId - val sessionActorRefOrNull = findTypedSessionActor(id, channel); + val sessionActorRefOrNull = findTypedSessionActor(id, channel) if (sessionActorRefOrNull ne null) sessionActorRefOrNull else { val actorFactoryOrNull = findTypedActorFactory(id) if (actorFactoryOrNull ne null) { - val newInstance = actorFactoryOrNull(); - typedSessionActors.get(channel).put(id, newInstance); + val newInstance = actorFactoryOrNull() + typedSessionActors.get(channel).put(id, newInstance) newInstance } else diff --git a/akka-remote/src/test/scala/remote/ServerInitiatedRemoteSessionActorSpec.scala b/akka-remote/src/test/scala/remote/ServerInitiatedRemoteSessionActorSpec.scala index e1f1db4c9e..dab1f83437 100644 --- a/akka-remote/src/test/scala/remote/ServerInitiatedRemoteSessionActorSpec.scala +++ b/akka-remote/src/test/scala/remote/ServerInitiatedRemoteSessionActorSpec.scala @@ -14,6 +14,7 @@ import java.util.concurrent.TimeUnit import akka.remote.{RemoteServer, RemoteClient} import akka.actor._ +import akka.actor.Actor._ import RemoteTypedActorLog._ object ServerInitiatedRemoteSessionActorSpec { @@ -21,27 +22,27 @@ object ServerInitiatedRemoteSessionActorSpec { val PORT = 9990 var server: RemoteServer = null - case class Login(user:String); - case class GetUser(); - case class DoSomethingFunny(); + case class Login(user:String) + case class GetUser() + case class DoSomethingFunny() - val instantiatedSessionActors= Set[ActorRef](); + var instantiatedSessionActors= Set[ActorRef]() class RemoteStatefullSessionActorSpec extends Actor { - var user : String= "anonymous"; + var user : String= "anonymous" override def preStart = { - instantiatedSessionActors += self; + instantiatedSessionActors += self } override def postStop = { - instantiatedSessionActors -= self; + instantiatedSessionActors -= self } def receive = { case Login(user) => - this.user = user; + this.user = user case GetUser() => self.reply(this.user) case DoSomethingFunny() => @@ -56,7 +57,7 @@ class ServerInitiatedRemoteSessionActorSpec extends FlatSpec with ShouldMatchers with BeforeAndAfterEach { - import ServerInitiatedRemoteTypedActorSpec._ + import ServerInitiatedRemoteSessionActorSpec._ private val unit = TimeUnit.MILLISECONDS @@ -65,7 +66,7 @@ class ServerInitiatedRemoteSessionActorSpec extends server = new RemoteServer() server.start(HOSTNAME, PORT) - server.registerTypedPerSessionActor("untyped-session-actor-service", actorOf[RemoteStatefullSessionActorSpec]) + server.registerPerSession("untyped-session-actor-service", actorOf[RemoteStatefullSessionActorSpec]) Thread.sleep(1000) } @@ -107,7 +108,7 @@ class ServerInitiatedRemoteSessionActorSpec extends HOSTNAME, PORT) // since this is a new session, the server should reset the state - val default2 = session2 !! GetUser(); + val default2 = session2 !! GetUser() default2.get.asInstanceOf[String] should equal ("anonymous") session2.stop() @@ -125,11 +126,11 @@ class ServerInitiatedRemoteSessionActorSpec extends val default1 = session1 !! GetUser() default1.get.asInstanceOf[String] should equal ("anonymous") - instantiatedSessionActors.size should have size (1) + instantiatedSessionActors should have size (1) RemoteClient.shutdownAll Thread.sleep(1000) - instantiatedSessionActors.size should have size (0); + instantiatedSessionActors should have size (0) } @@ -141,18 +142,18 @@ class ServerInitiatedRemoteSessionActorSpec extends HOSTNAME, PORT) - session1 ! DoSomethingFunny(); + session1 ! DoSomethingFunny() session1.stop() RemoteClient.shutdownAll Thread.sleep(1000) - instantiatedSessionActors.size should have size (0); + instantiatedSessionActors should have size (0) } it should "be able to unregister" in { - server.registerPerSession("my-service-1", actorOf[RemoteActorSpecActorUnidirectional]) + server.registerPerSession("my-service-1", actorOf[RemoteStatefullSessionActorSpec]) server.actorsFactories.get("my-service-1") should not be (null) server.unregisterPerSession("my-service-1") server.actorsFactories.get("my-service-1") should be (null) diff --git a/akka-remote/src/test/scala/remote/ServerInitiatedRemoteTypedSessionActorSpec.scala b/akka-remote/src/test/scala/remote/ServerInitiatedRemoteTypedSessionActorSpec.scala index 575d82ca96..2e82afc6ba 100644 --- a/akka-remote/src/test/scala/remote/ServerInitiatedRemoteTypedSessionActorSpec.scala +++ b/akka-remote/src/test/scala/remote/ServerInitiatedRemoteTypedSessionActorSpec.scala @@ -58,15 +58,15 @@ class ServerInitiatedRemoteTypedSessionActorSpec extends val session1 = RemoteClient.typedActorFor(classOf[RemoteTypedSessionActor], "typed-session-actor-service", 5000L, HOSTNAME, PORT) - session1.getUser() should equal ("anonymous"); - session1.login("session[1]"); - session1.getUser() should equal ("session[1]"); + session1.getUser() should equal ("anonymous") + session1.login("session[1]") + session1.getUser() should equal ("session[1]") RemoteClient.shutdownAll val session2 = RemoteClient.typedActorFor(classOf[RemoteTypedSessionActor], "typed-session-actor-service", 5000L, HOSTNAME, PORT) - session2.getUser() should equal ("anonymous"); + session2.getUser() should equal ("anonymous") RemoteClient.shutdownAll @@ -76,12 +76,12 @@ class ServerInitiatedRemoteTypedSessionActorSpec extends val session1 = RemoteClient.typedActorFor(classOf[RemoteTypedSessionActor], "typed-session-actor-service", 5000L, HOSTNAME, PORT) - session1.getUser() should equal ("anonymous"); + session1.getUser() should equal ("anonymous") - RemoteTypedSessionActorImpl.getInstances() should have size (1); + RemoteTypedSessionActorImpl.getInstances() should have size (1) RemoteClient.shutdownAll Thread.sleep(1000) - RemoteTypedSessionActorImpl.getInstances() should have size (0); + RemoteTypedSessionActorImpl.getInstances() should have size (0) } @@ -89,11 +89,11 @@ class ServerInitiatedRemoteTypedSessionActorSpec extends val session1 = RemoteClient.typedActorFor(classOf[RemoteTypedSessionActor], "typed-session-actor-service", 5000L, HOSTNAME, PORT) - session1.doSomethingFunny(); + session1.doSomethingFunny() RemoteClient.shutdownAll Thread.sleep(1000) - RemoteTypedSessionActorImpl.getInstances() should have size (0); + RemoteTypedSessionActorImpl.getInstances() should have size (0) } From e5fdbaaeb4964f7bbe8b465a258e59ad33655c78 Mon Sep 17 00:00:00 2001 From: Paul Pacheco Date: Sat, 20 Nov 2010 15:48:37 -0600 Subject: [PATCH 08/10] tests pass --- .../src/main/scala/remote/RemoteServer.scala | 36 +++++++++---------- .../ServerInitiatedRemoteActorSpec.scala | 6 ++-- ...erverInitiatedRemoteSessionActorSpec.scala | 1 - ...InitiatedRemoteTypedSessionActorSpec.scala | 2 -- 4 files changed, 18 insertions(+), 27 deletions(-) diff --git a/akka-remote/src/main/scala/remote/RemoteServer.scala b/akka-remote/src/main/scala/remote/RemoteServer.scala index c129506ad3..6dd418b046 100644 --- a/akka-remote/src/main/scala/remote/RemoteServer.scala +++ b/akka-remote/src/main/scala/remote/RemoteServer.scala @@ -326,28 +326,31 @@ class RemoteServer extends Logging with ListenerManagement { } private def register[Key](id: Key, actorRef: ActorRef, registry: ConcurrentHashMap[Key, ActorRef]) { - if (_isRunning) { - if (registry.putIfAbsent(id, actorRef) eq null) { - if (!actorRef.isRunning) actorRef.start - } + // TODO: contains is an alias for containsValue, not containsKey, so the contains is useless here + if (_isRunning && !registry.contains(id)) { + registry.put(id, actorRef); + if (!actorRef.isRunning) actorRef.start } } private def registerPerSession[Key](id: Key, factory: () => ActorRef, registry: ConcurrentHashMap[Key,() => ActorRef]) { - if (_isRunning) { - registry.putIfAbsent(id, factory) + // TODO: contains is an alias for containsValue, not containsKey, so the contains is useless here + if (_isRunning && !registry.contains(id)) { + registry.put(id, factory) } } private def registerTypedActor[Key](id: Key, typedActor: AnyRef, registry: ConcurrentHashMap[Key, AnyRef]) { - if (_isRunning) { - registry.putIfAbsent(id, typedActor) + // TODO: contains is an alias for containsValue, not containsKey, so the contains is useless here + if (_isRunning && !registry.contains(id)) { + registry.put(id, typedActor) } } private def registerTypedPerSessionActor[Key](id: Key, factory: () => AnyRef, registry: ConcurrentHashMap[Key,() => AnyRef]) { - if (_isRunning) { - registry.putIfAbsent(id, factory) + // TODO: contains is an alias for containsValue, not containsKey, so the contains is useless here + if (_isRunning && !registry.contains(id)) { + registry.put(id, factory) } } @@ -820,23 +823,16 @@ class RemoteServerHandler( val actorRefOrNull = findActorByIdOrUuid(id, uuidFrom(uuid.getHigh,uuid.getLow).toString) - if (actorRefOrNull ne null) { - println("Giving actor by id or uuid ") + if (actorRefOrNull ne null) actorRefOrNull - } else { // the actor has not been registered globally. See if we have it in the session val sessionActorRefOrNull = createSessionActor(actorInfo, channel) - if (sessionActorRefOrNull ne null) { - println("giving session actor") + if (sessionActorRefOrNull ne null) sessionActorRefOrNull - } else // maybe it is a client managed actor - { - println("Client managed actor") - createClientManagedActor(actorInfo) - } + createClientManagedActor(actorInfo) } } diff --git a/akka-remote/src/test/scala/remote/ServerInitiatedRemoteActorSpec.scala b/akka-remote/src/test/scala/remote/ServerInitiatedRemoteActorSpec.scala index b0dee8563e..5881687c01 100644 --- a/akka-remote/src/test/scala/remote/ServerInitiatedRemoteActorSpec.scala +++ b/akka-remote/src/test/scala/remote/ServerInitiatedRemoteActorSpec.scala @@ -3,18 +3,17 @@ package akka.actor.remote import java.util.concurrent.{CountDownLatch, TimeUnit} import org.scalatest.junit.JUnitSuite import org.junit.{Test, Before, After} - +import akka.util._ + import akka.remote.{RemoteServer, RemoteClient} import akka.actor.Actor._ import akka.actor.{ActorRegistry, ActorRef, Actor} -import scala.collection.mutable.Set object ServerInitiatedRemoteActorSpec { val HOSTNAME = "localhost" val PORT = 9990 var server: RemoteServer = null - case class Send(actor: ActorRef) object RemoteActorSpecActorUnidirectional { @@ -105,7 +104,6 @@ class ServerInitiatedRemoteActorSpec extends JUnitSuite { actor.stop } - @Test def shouldSendWithBangAndGetReplyThroughSenderRef { implicit val timeout = 500000000L diff --git a/akka-remote/src/test/scala/remote/ServerInitiatedRemoteSessionActorSpec.scala b/akka-remote/src/test/scala/remote/ServerInitiatedRemoteSessionActorSpec.scala index dab1f83437..173898c1fe 100644 --- a/akka-remote/src/test/scala/remote/ServerInitiatedRemoteSessionActorSpec.scala +++ b/akka-remote/src/test/scala/remote/ServerInitiatedRemoteSessionActorSpec.scala @@ -145,7 +145,6 @@ class ServerInitiatedRemoteSessionActorSpec extends session1 ! DoSomethingFunny() session1.stop() - RemoteClient.shutdownAll Thread.sleep(1000) instantiatedSessionActors should have size (0) diff --git a/akka-remote/src/test/scala/remote/ServerInitiatedRemoteTypedSessionActorSpec.scala b/akka-remote/src/test/scala/remote/ServerInitiatedRemoteTypedSessionActorSpec.scala index 2e82afc6ba..0ae4ca2dee 100644 --- a/akka-remote/src/test/scala/remote/ServerInitiatedRemoteTypedSessionActorSpec.scala +++ b/akka-remote/src/test/scala/remote/ServerInitiatedRemoteTypedSessionActorSpec.scala @@ -68,8 +68,6 @@ class ServerInitiatedRemoteTypedSessionActorSpec extends session2.getUser() should equal ("anonymous") - RemoteClient.shutdownAll - } it should "stop the actor when the client disconnects" in { From 1bc4bc0a8937c9fec5a2e6d11d5a9530dfc9b2ba Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonas=20Bon=C3=A9r?= Date: Mon, 22 Nov 2010 08:59:45 +0100 Subject: [PATCH 09/10] Fixed bug in ActorRegistry getting typed actor by manifest --- .../main/scala/akka/actor/ActorRegistry.scala | 101 +++++++----------- .../ExecutorBasedEventDrivenDispatcher.scala | 10 +- .../scala/akka/dispatch/MailboxHandling.scala | 16 +-- .../scala/akka/util/ReflectiveAccess.scala | 58 +++++----- .../scala/akka/misc/ActorRegistrySpec.scala | 1 - 5 files changed, 80 insertions(+), 106 deletions(-) diff --git a/akka-actor/src/main/scala/akka/actor/ActorRegistry.scala b/akka-actor/src/main/scala/akka/actor/ActorRegistry.scala index bf0a479f7f..aebfe70d85 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRegistry.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRegistry.scala @@ -37,10 +37,10 @@ case class ActorUnregistered(actor: ActorRef) extends ActorRegistryEvent * @author Jonas Bonér */ object ActorRegistry extends ListenerManagement { - private val actorsByUUID = new ConcurrentHashMap[Uuid, ActorRef] - private val actorsById = new Index[String,ActorRef] - private val remoteActorSets = Map[Address, RemoteActorSet]() - private val guard = new ReadWriteGuard + private val actorsByUUID = new ConcurrentHashMap[Uuid, ActorRef] + private val actorsById = new Index[String,ActorRef] + private val remoteActorSets = Map[Address, RemoteActorSet]() + private val guard = new ReadWriteGuard /** * Returns all actors in the system. @@ -50,7 +50,7 @@ object ActorRegistry extends ListenerManagement { /** * Returns the number of actors in the system. */ - def size : Int = actorsByUUID.size + def size: Int = actorsByUUID.size /** * Invokes a function for all actors. @@ -68,8 +68,7 @@ object ActorRegistry extends ListenerManagement { val elements = actorsByUUID.elements while (elements.hasMoreElements) { val element = elements.nextElement - if(f isDefinedAt element) - return Some(f(element)) + if (f isDefinedAt element) return Some(f(element)) } None } @@ -88,9 +87,7 @@ object ActorRegistry extends ListenerManagement { val elements = actorsByUUID.elements while (elements.hasMoreElements) { val actorId = elements.nextElement - if (p(actorId)) { - all += actorId - } + if (p(actorId)) all += actorId } all.toArray } @@ -105,7 +102,7 @@ object ActorRegistry extends ListenerManagement { * Finds any actor that matches T. */ def actorFor[T <: Actor](implicit manifest: Manifest[T]): Option[ActorRef] = - find({ case a:ActorRef if manifest.erasure.isAssignableFrom(a.actor.getClass) => a }) + find({ case a: ActorRef if manifest.erasure.isAssignableFrom(a.actor.getClass) => a }) /** * Finds all actors of type or sub-type specified by the class passed in as the Class argument. @@ -132,13 +129,11 @@ object ActorRegistry extends ListenerManagement { * Invokes a function for all typed actors. */ def foreachTypedActor(f: (AnyRef) => Unit) = { - TypedActorModule.ensureTypedActorEnabled + TypedActorModule.ensureEnabled val elements = actorsByUUID.elements while (elements.hasMoreElements) { val proxy = typedActorFor(elements.nextElement) - if (proxy.isDefined) { - f(proxy.get) - } + if (proxy.isDefined) f(proxy.get) } } @@ -147,12 +142,11 @@ object ActorRegistry extends ListenerManagement { * Returns None if the function never returns Some */ def findTypedActor[T](f: PartialFunction[AnyRef,T]) : Option[T] = { - TypedActorModule.ensureTypedActorEnabled + TypedActorModule.ensureEnabled val elements = actorsByUUID.elements while (elements.hasMoreElements) { val proxy = typedActorFor(elements.nextElement) - if(proxy.isDefined && (f isDefinedAt proxy)) - return Some(f(proxy)) + if (proxy.isDefined && (f isDefinedAt proxy)) return Some(f(proxy)) } None } @@ -161,14 +155,12 @@ object ActorRegistry extends ListenerManagement { * Finds all typed actors that satisfy a predicate. */ def filterTypedActors(p: AnyRef => Boolean): Array[AnyRef] = { - TypedActorModule.ensureTypedActorEnabled + TypedActorModule.ensureEnabled val all = new ListBuffer[AnyRef] val elements = actorsByUUID.elements while (elements.hasMoreElements) { val proxy = typedActorFor(elements.nextElement) - if (proxy.isDefined && p(proxy.get)) { - all += proxy.get - } + if (proxy.isDefined && p(proxy.get)) all += proxy.get } all.toArray } @@ -177,7 +169,7 @@ object ActorRegistry extends ListenerManagement { * Finds all typed actors that are subtypes of the class passed in as the Manifest argument. */ def typedActorsFor[T <: AnyRef](implicit manifest: Manifest[T]): Array[AnyRef] = { - TypedActorModule.ensureTypedActorEnabled + TypedActorModule.ensureEnabled typedActorsFor[T](manifest.erasure.asInstanceOf[Class[T]]) } @@ -185,20 +177,20 @@ object ActorRegistry extends ListenerManagement { * Finds any typed actor that matches T. */ def typedActorFor[T <: AnyRef](implicit manifest: Manifest[T]): Option[AnyRef] = { - TypedActorModule.ensureTypedActorEnabled - def predicate(proxy: AnyRef) : Boolean = { + TypedActorModule.ensureEnabled + def predicate(proxy: AnyRef): Boolean = { val actorRef = TypedActorModule.typedActorObjectInstance.get.actorFor(proxy) actorRef.isDefined && manifest.erasure.isAssignableFrom(actorRef.get.actor.getClass) } - findTypedActor({ case a:AnyRef if predicate(a) => a }) + findTypedActor({ case a: Some[AnyRef] if predicate(a.get) => a }) } /** * Finds all typed actors of type or sub-type specified by the class passed in as the Class argument. */ def typedActorsFor[T <: AnyRef](clazz: Class[T]): Array[AnyRef] = { - TypedActorModule.ensureTypedActorEnabled - def predicate(proxy: AnyRef) : Boolean = { + TypedActorModule.ensureEnabled + def predicate(proxy: AnyRef): Boolean = { val actorRef = TypedActorModule.typedActorObjectInstance.get.actorFor(proxy) actorRef.isDefined && clazz.isAssignableFrom(actorRef.get.actor.getClass) } @@ -209,7 +201,7 @@ object ActorRegistry extends ListenerManagement { * Finds all typed actors that have a specific id. */ def typedActorsFor(id: String): Array[AnyRef] = { - TypedActorModule.ensureTypedActorEnabled + TypedActorModule.ensureEnabled val actorRefs = actorsById values id actorRefs.flatMap(typedActorFor(_)) } @@ -218,12 +210,10 @@ object ActorRegistry extends ListenerManagement { * Finds the typed actor that has a specific UUID. */ def typedActorFor(uuid: Uuid): Option[AnyRef] = { - TypedActorModule.ensureTypedActorEnabled + TypedActorModule.ensureEnabled val actorRef = actorsByUUID get uuid - if (actorRef eq null) - None - else - typedActorFor(actorRef) + if (actorRef eq null) None + else typedActorFor(actorRef) } /** @@ -265,20 +255,15 @@ object ActorRegistry extends ListenerManagement { */ def shutdownAll() { log.info("Shutting down all actors in the system...") - if (TypedActorModule.isTypedActorEnabled) { + if (TypedActorModule.isEnabled) { val elements = actorsByUUID.elements while (elements.hasMoreElements) { val actorRef = elements.nextElement val proxy = typedActorFor(actorRef) - if (proxy.isDefined) { - TypedActorModule.typedActorObjectInstance.get.stop(proxy.get) - } else { - actorRef.stop - } + if (proxy.isDefined) TypedActorModule.typedActorObjectInstance.get.stop(proxy.get) + else actorRef.stop } - } else { - foreach(_.stop) - } + } else foreach(_.stop) actorsByUUID.clear actorsById.clear log.info("All actors have been shut down and unregistered from ActorRegistry") @@ -337,16 +322,13 @@ class Index[K <: AnyRef,V <: AnyRef : Manifest] { if (set ne null) { set.synchronized { - if (set.isEmpty) { - retry = true //IF the set is empty then it has been removed, so signal retry - } + if (set.isEmpty) retry = true //IF the set is empty then it has been removed, so signal retry else { //Else add the value to the set and signal that retry is not needed added = set add v retry = false } } - } - else { + } else { val newSet = new ConcurrentSkipListSet[V] newSet add v @@ -354,24 +336,20 @@ class Index[K <: AnyRef,V <: AnyRef : Manifest] { val oldSet = container.putIfAbsent(k,newSet) if (oldSet ne null) { oldSet.synchronized { - if (oldSet.isEmpty) { - retry = true //IF the set is empty then it has been removed, so signal retry - } + if (oldSet.isEmpty) retry = true //IF the set is empty then it has been removed, so signal retry else { //Else try to add the value to the set and signal that retry is not needed added = oldSet add v retry = false } } - } else { - added = true - } + } else added = true } - if (retry) spinPut(k,v) + if (retry) spinPut(k, v) else added } - spinPut(key,value) + spinPut(key, value) } /** @@ -390,10 +368,8 @@ class Index[K <: AnyRef,V <: AnyRef : Manifest] { def findValue(key: K)(f: (V) => Boolean): Option[V] = { import scala.collection.JavaConversions._ val set = container get key - if (set ne null) - set.iterator.find(f) - else - None + if (set ne null) set.iterator.find(f) + else None } /** @@ -420,8 +396,7 @@ class Index[K <: AnyRef,V <: AnyRef : Manifest] { container.remove(key,emptySet) //We try to remove the key if it's mapped to an empty set true //Remove succeeded - } - else false //Remove failed + } else false //Remove failed } } else false //Remove failed } @@ -434,5 +409,5 @@ class Index[K <: AnyRef,V <: AnyRef : Manifest] { /** * Removes all keys and all values */ - def clear = foreach { case (k,v) => remove(k,v) } + def clear = foreach { case (k, v) => remove(k, v) } } diff --git a/akka-actor/src/main/scala/akka/dispatch/ExecutorBasedEventDrivenDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/ExecutorBasedEventDrivenDispatcher.scala index 83ff50427a..ee5dd890b8 100644 --- a/akka-actor/src/main/scala/akka/dispatch/ExecutorBasedEventDrivenDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/ExecutorBasedEventDrivenDispatcher.scala @@ -5,7 +5,7 @@ package akka.dispatch import akka.actor.{ActorRef, IllegalActorStateException} -import akka.util.ReflectiveAccess.EnterpriseModule +import akka.util.ReflectiveAccess.AkkaCloudModule import java.util.Queue import akka.util.Switch @@ -123,10 +123,10 @@ class ExecutorBasedEventDrivenDispatcher( */ def createDurableMailbox(actorRef: ActorRef, mailboxType: DurableMailboxType): AnyRef = mailboxType match { // FIXME make generic (work for TypedActor as well) - case FileBasedDurableMailbox(serializer) => EnterpriseModule.createFileBasedMailbox(actorRef).asInstanceOf[MessageQueue] - case ZooKeeperBasedDurableMailbox(serializer) => EnterpriseModule.createZooKeeperBasedMailbox(actorRef).asInstanceOf[MessageQueue] - case BeanstalkBasedDurableMailbox(serializer) => EnterpriseModule.createBeanstalkBasedMailbox(actorRef).asInstanceOf[MessageQueue] - case RedisBasedDurableMailbox(serializer) => EnterpriseModule.createRedisBasedMailbox(actorRef).asInstanceOf[MessageQueue] + case FileBasedDurableMailbox(serializer) => AkkaCloudModule.createFileBasedMailbox(actorRef).asInstanceOf[MessageQueue] + case ZooKeeperBasedDurableMailbox(serializer) => AkkaCloudModule.createZooKeeperBasedMailbox(actorRef).asInstanceOf[MessageQueue] + case BeanstalkBasedDurableMailbox(serializer) => AkkaCloudModule.createBeanstalkBasedMailbox(actorRef).asInstanceOf[MessageQueue] + case RedisBasedDurableMailbox(serializer) => AkkaCloudModule.createRedisBasedMailbox(actorRef).asInstanceOf[MessageQueue] case AMQPBasedDurableMailbox(serializer) => throw new UnsupportedOperationException("AMQPBasedDurableMailbox is not yet supported") case JMSBasedDurableMailbox(serializer) => throw new UnsupportedOperationException("JMSBasedDurableMailbox is not yet supported") } diff --git a/akka-actor/src/main/scala/akka/dispatch/MailboxHandling.scala b/akka-actor/src/main/scala/akka/dispatch/MailboxHandling.scala index ff71b607ce..7e81d4a598 100644 --- a/akka-actor/src/main/scala/akka/dispatch/MailboxHandling.scala +++ b/akka-actor/src/main/scala/akka/dispatch/MailboxHandling.scala @@ -5,7 +5,7 @@ package akka.dispatch import akka.actor.{Actor, ActorType, ActorRef, ActorInitializationException} -import akka.util.ReflectiveAccess.EnterpriseModule +import akka.util.ReflectiveAccess.AkkaCloudModule import akka.AkkaException import java.util.{Queue, List} @@ -42,15 +42,15 @@ case class BoundedMailbox( if (pushTimeOut eq null) throw new IllegalArgumentException("The push time-out for BoundedMailbox can not be null") } -abstract class DurableMailboxType(val serializer: EnterpriseModule.Serializer) extends MailboxType { +abstract class DurableMailboxType(val serializer: AkkaCloudModule.Serializer) extends MailboxType { if (serializer eq null) throw new IllegalArgumentException("The serializer for DurableMailboxType can not be null") } -case class FileBasedDurableMailbox(ser: EnterpriseModule.Serializer) extends DurableMailboxType(ser) -case class RedisBasedDurableMailbox(ser: EnterpriseModule.Serializer) extends DurableMailboxType(ser) -case class BeanstalkBasedDurableMailbox(ser: EnterpriseModule.Serializer) extends DurableMailboxType(ser) -case class ZooKeeperBasedDurableMailbox(ser: EnterpriseModule.Serializer) extends DurableMailboxType(ser) -case class AMQPBasedDurableMailbox(ser: EnterpriseModule.Serializer) extends DurableMailboxType(ser) -case class JMSBasedDurableMailbox(ser: EnterpriseModule.Serializer) extends DurableMailboxType(ser) +case class FileBasedDurableMailbox(ser: AkkaCloudModule.Serializer) extends DurableMailboxType(ser) +case class RedisBasedDurableMailbox(ser: AkkaCloudModule.Serializer) extends DurableMailboxType(ser) +case class BeanstalkBasedDurableMailbox(ser: AkkaCloudModule.Serializer) extends DurableMailboxType(ser) +case class ZooKeeperBasedDurableMailbox(ser: AkkaCloudModule.Serializer) extends DurableMailboxType(ser) +case class AMQPBasedDurableMailbox(ser: AkkaCloudModule.Serializer) extends DurableMailboxType(ser) +case class JMSBasedDurableMailbox(ser: AkkaCloudModule.Serializer) extends DurableMailboxType(ser) class DefaultUnboundedMessageQueue(blockDequeue: Boolean) extends LinkedBlockingQueue[MessageInvocation] with MessageQueue { diff --git a/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala b/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala index 5257d596f0..b5a4c7edb7 100644 --- a/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala +++ b/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala @@ -20,13 +20,13 @@ object ReflectiveAccess extends Logging { val loader = getClass.getClassLoader - lazy val isRemotingEnabled = RemoteClientModule.isRemotingEnabled - lazy val isTypedActorEnabled = TypedActorModule.isTypedActorEnabled - lazy val isEnterpriseEnabled = EnterpriseModule.isEnterpriseEnabled + lazy val isRemotingEnabled = RemoteClientModule.isEnabled + lazy val isTypedActorEnabled = TypedActorModule.isEnabled + lazy val isAkkaCloudEnabled = AkkaCloudModule.isEnabled - def ensureRemotingEnabled = RemoteClientModule.ensureRemotingEnabled - def ensureTypedActorEnabled = TypedActorModule.ensureTypedActorEnabled - def ensureEnterpriseEnabled = EnterpriseModule.ensureEnterpriseEnabled + def ensureRemotingEnabled = RemoteClientModule.ensureEnabled + def ensureTypedActorEnabled = TypedActorModule.ensureEnabled + def ensureAkkaCloudEnabled = AkkaCloudModule.ensureEnabled /** * Reflective access to the RemoteClient module. @@ -56,32 +56,32 @@ object ReflectiveAccess extends Logging { def clientFor(hostname: String, port: Int, loader: Option[ClassLoader]): RemoteClient } - lazy val isRemotingEnabled = remoteClientObjectInstance.isDefined + lazy val isEnabled = remoteClientObjectInstance.isDefined - def ensureRemotingEnabled = if (!isRemotingEnabled) throw new ModuleNotAvailableException( + def ensureEnabled = if (!isEnabled) throw new ModuleNotAvailableException( "Can't load the remoting module, make sure that akka-remote.jar is on the classpath") val remoteClientObjectInstance: Option[RemoteClientObject] = getObjectFor("akka.remote.RemoteClient$") def register(address: InetSocketAddress, uuid: Uuid) = { - ensureRemotingEnabled + ensureEnabled remoteClientObjectInstance.get.register(address.getHostName, address.getPort, uuid) } def unregister(address: InetSocketAddress, uuid: Uuid) = { - ensureRemotingEnabled + ensureEnabled remoteClientObjectInstance.get.unregister(address.getHostName, address.getPort, uuid) } def registerSupervisorForActor(remoteAddress: InetSocketAddress, actorRef: ActorRef) = { - ensureRemotingEnabled + ensureEnabled val remoteClient = remoteClientObjectInstance.get.clientFor(remoteAddress) remoteClient.registerSupervisorForActor(actorRef) } def clientFor(hostname: String, port: Int, loader: Option[ClassLoader]): RemoteClient = { - ensureRemotingEnabled + ensureEnabled remoteClientObjectInstance.get.clientFor(hostname, port, loader) } @@ -95,7 +95,7 @@ object ReflectiveAccess extends Logging { actorRef: ActorRef, typedActorInfo: Option[Tuple2[String, String]], actorType: ActorType): Option[CompletableFuture[T]] = { - ensureRemotingEnabled + ensureEnabled clientFor(remoteAddress.getHostName, remoteAddress.getPort, None).send[T]( message, senderOption, senderFuture, remoteAddress, timeout, isOneWay, actorRef, typedActorInfo, actorType) } @@ -126,17 +126,17 @@ object ReflectiveAccess extends Logging { getObjectFor("akka.remote.RemoteNode$") def registerActor(address: InetSocketAddress, actorRef: ActorRef) = { - ensureRemotingEnabled + RemoteClientModule.ensureEnabled remoteServerObjectInstance.get.registerActor(address, actorRef) } def registerTypedActor(address: InetSocketAddress, implementationClassName: String, proxy: AnyRef) = { - ensureRemotingEnabled + RemoteClientModule.ensureEnabled remoteServerObjectInstance.get.registerTypedActor(address, implementationClassName, proxy) } def unregister(actorRef: ActorRef) = { - ensureRemotingEnabled + RemoteClientModule.ensureEnabled remoteNodeObjectInstance.get.unregister(actorRef) } } @@ -156,16 +156,16 @@ object ReflectiveAccess extends Logging { def stop(anyRef: AnyRef) : Unit } - lazy val isTypedActorEnabled = typedActorObjectInstance.isDefined + lazy val isEnabled = typedActorObjectInstance.isDefined - def ensureTypedActorEnabled = if (!isTypedActorEnabled) throw new ModuleNotAvailableException( + def ensureEnabled = if (!isTypedActorEnabled) throw new ModuleNotAvailableException( "Can't load the typed actor module, make sure that akka-typed-actor.jar is on the classpath") val typedActorObjectInstance: Option[TypedActorObject] = getObjectFor("akka.actor.TypedActor$") def resolveFutureIfMessageIsJoinPoint(message: Any, future: Future[_]): Boolean = { - ensureTypedActorEnabled + ensureEnabled if (typedActorObjectInstance.get.isJoinPointAndOneWay(message)) { future.asInstanceOf[CompletableFuture[Option[_]]].completeWithResult(None) } @@ -173,7 +173,7 @@ object ReflectiveAccess extends Logging { } } - object EnterpriseModule { + object AkkaCloudModule { type Mailbox = { def enqueue(message: MessageInvocation) @@ -185,27 +185,27 @@ object ReflectiveAccess extends Logging { def fromBinary(bytes: Array[Byte], clazz: Option[Class[_]]): AnyRef } - lazy val isEnterpriseEnabled = clusterObjectInstance.isDefined + lazy val isEnabled = clusterObjectInstance.isDefined val clusterObjectInstance: Option[AnyRef] = - getObjectFor("akka.cluster.Cluster$") + getObjectFor("akka.cloud.cluster.Cluster$") val serializerClass: Option[Class[_]] = getClassFor("akka.serialization.Serializer") - def ensureEnterpriseEnabled = if (!isEnterpriseEnabled) throw new ModuleNotAvailableException( - "Feature is only available in Akka Enterprise edition") + def ensureEnabled = if (!isEnabled) throw new ModuleNotAvailableException( + "Feature is only available in Akka Cloud") - def createFileBasedMailbox(actorRef: ActorRef): Mailbox = createMailbox("akka.actor.mailbox.FileBasedMailbox", actorRef) + def createFileBasedMailbox(actorRef: ActorRef): Mailbox = createMailbox("akka.cloud.cluster.FileBasedMailbox", actorRef) - def createZooKeeperBasedMailbox(actorRef: ActorRef): Mailbox = createMailbox("akka.actor.mailbox.ZooKeeperBasedMailbox", actorRef) + def createZooKeeperBasedMailbox(actorRef: ActorRef): Mailbox = createMailbox("akka.cloud.cluster.ZooKeeperBasedMailbox", actorRef) - def createBeanstalkBasedMailbox(actorRef: ActorRef): Mailbox = createMailbox("akka.actor.mailbox.BeanstalkBasedMailbox", actorRef) + def createBeanstalkBasedMailbox(actorRef: ActorRef): Mailbox = createMailbox("akka.cloud.cluster.BeanstalkBasedMailbox", actorRef) - def createRedisBasedMailbox(actorRef: ActorRef): Mailbox = createMailbox("akka.actor.mailbox.RedisBasedMailbox", actorRef) + def createRedisBasedMailbox(actorRef: ActorRef): Mailbox = createMailbox("akka.cloud.cluster.RedisBasedMailbox", actorRef) private def createMailbox(mailboxClassname: String, actorRef: ActorRef): Mailbox = { - ensureEnterpriseEnabled + ensureEnabled createInstance( mailboxClassname, Array(classOf[ActorRef]), diff --git a/akka-actor/src/test/scala/akka/misc/ActorRegistrySpec.scala b/akka-actor/src/test/scala/akka/misc/ActorRegistrySpec.scala index 6148b04f53..f951281f2e 100644 --- a/akka-actor/src/test/scala/akka/misc/ActorRegistrySpec.scala +++ b/akka-actor/src/test/scala/akka/misc/ActorRegistrySpec.scala @@ -27,7 +27,6 @@ object ActorRegistrySpec { self.reply("got ping") } } - } class ActorRegistrySpec extends JUnitSuite { From 47246b2a27eb545d94fb07540ad66ca38d971245 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Mon, 22 Nov 2010 11:57:17 +0100 Subject: [PATCH 10/10] Merging in Actor per Session + fixing blocking problem with remote typed actors with Future response types --- .../main/scala/akka/remote/RemoteServer.scala | 91 ++++++++++--------- 1 file changed, 48 insertions(+), 43 deletions(-) diff --git a/akka-remote/src/main/scala/akka/remote/RemoteServer.scala b/akka-remote/src/main/scala/akka/remote/RemoteServer.scala index 6dd418b046..1d79978559 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteServer.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteServer.scala @@ -326,32 +326,25 @@ class RemoteServer extends Logging with ListenerManagement { } private def register[Key](id: Key, actorRef: ActorRef, registry: ConcurrentHashMap[Key, ActorRef]) { - // TODO: contains is an alias for containsValue, not containsKey, so the contains is useless here - if (_isRunning && !registry.contains(id)) { - registry.put(id, actorRef); + if (_isRunning) { + registry.put(id, actorRef) //TODO change to putIfAbsent if (!actorRef.isRunning) actorRef.start } } private def registerPerSession[Key](id: Key, factory: () => ActorRef, registry: ConcurrentHashMap[Key,() => ActorRef]) { - // TODO: contains is an alias for containsValue, not containsKey, so the contains is useless here - if (_isRunning && !registry.contains(id)) { - registry.put(id, factory) - } + if (_isRunning) + registry.put(id, factory) //TODO change to putIfAbsent } private def registerTypedActor[Key](id: Key, typedActor: AnyRef, registry: ConcurrentHashMap[Key, AnyRef]) { - // TODO: contains is an alias for containsValue, not containsKey, so the contains is useless here - if (_isRunning && !registry.contains(id)) { - registry.put(id, typedActor) - } + if (_isRunning) + registry.put(id, typedActor) //TODO change to putIfAbsent } private def registerTypedPerSessionActor[Key](id: Key, factory: () => AnyRef, registry: ConcurrentHashMap[Key,() => AnyRef]) { - // TODO: contains is an alias for containsValue, not containsKey, so the contains is useless here - if (_isRunning && !registry.contains(id)) { - registry.put(id, factory) - } + if (_isRunning) + registry.put(id, factory) //TODO change to putIfAbsent } /** @@ -531,18 +524,21 @@ class RemoteServerHandler( val clientAddress = getClientAddress(ctx) log.debug("Remote client [%s] disconnected from [%s]", clientAddress, server.name) // stop all session actors - val channelActors = sessionActors.remove(event.getChannel()) - val channelActorsIterator = channelActors.elements() - while (channelActorsIterator.hasMoreElements()) { - channelActorsIterator.nextElement().stop() + val channelActors = sessionActors.remove(event.getChannel) + if (channelActors ne null) { + val channelActorsIterator = channelActors.elements + while (channelActorsIterator.hasMoreElements) { + channelActorsIterator.nextElement.stop + } } - val channelTypedActors = typedSessionActors.remove(event.getChannel()) - val channelTypedActorsIterator = channelTypedActors.elements() - while (channelTypedActorsIterator.hasMoreElements()) { - TypedActor.stop(channelTypedActorsIterator.nextElement()) + val channelTypedActors = typedSessionActors.remove(event.getChannel) + if (channelTypedActors ne null) { + val channelTypedActorsIterator = channelTypedActors.elements + while (channelTypedActorsIterator.hasMoreElements) { + TypedActor.stop(channelTypedActorsIterator.nextElement) + } } - server.notifyListeners(RemoteServerClientDisconnected(server, clientAddress)) } @@ -629,7 +625,7 @@ class RemoteServerHandler( val exception = f.exception if (exception.isDefined) { - log.debug("Returning exception from actor invocation [%s]".format(exception.get)) + log.debug("Returning exception from actor invocation [%s]",exception.get) try { channel.write(createErrorReplyMessage(exception.get, request, AkkaActorType.ScalaActor)) } catch { @@ -679,26 +675,35 @@ class RemoteServerHandler( val messageReceiver = typedActor.getClass.getDeclaredMethod(typedActorInfo.getMethod, argClasses: _*) if (request.getOneWay) messageReceiver.invoke(typedActor, args: _*) else { - val result = messageReceiver.invoke(typedActor, args: _*) match { - case f: Future[_] => f.await.result.get //TODO replace this with a Listener on the Future to avoid blocking - case other => other + //Sends the response + def sendResponse(result: Either[Any,Throwable]): Unit = try { + val messageBuilder = RemoteActorSerialization.createRemoteMessageProtocolBuilder( + None, + Right(request.getUuid), + actorInfo.getId, + actorInfo.getTarget, + actorInfo.getTimeout, + result, + true, + None, + None, + AkkaActorType.TypedActor, + None) + if (request.hasSupervisorUuid) messageBuilder.setSupervisorUuid(request.getSupervisorUuid) + channel.write(messageBuilder.build) + log.debug("Returning result from remote typed actor invocation [%s]", result) + } catch { + case e: Throwable => server.notifyListeners(RemoteServerError(e, server)) } - log.debug("Returning result from remote typed actor invocation [%s]", result) - val messageBuilder = RemoteActorSerialization.createRemoteMessageProtocolBuilder( - None, - Right(request.getUuid), - actorInfo.getId, - actorInfo.getTarget, - actorInfo.getTimeout, - Left(result), - true, - None, - None, - AkkaActorType.TypedActor, - None) - if (request.hasSupervisorUuid) messageBuilder.setSupervisorUuid(request.getSupervisorUuid) - channel.write(messageBuilder.build) + messageReceiver.invoke(typedActor, args: _*) match { + case f: Future[_] => //If it's a future, we can lift on that to defer the send to when the future is completed + f.onComplete( future => { + val result: Either[Any,Throwable] = if (future.exception.isDefined) Right(future.exception.get) else Left(future.result.get) + sendResponse(result) + }) + case other => sendResponse(Left(other)) + } } } catch { case e: InvocationTargetException =>