diff --git a/akka-actor/src/main/scala/actor/ActorRegistry.scala b/akka-actor/src/main/scala/actor/ActorRegistry.scala index 6751ee9350..0b946df99f 100644 --- a/akka-actor/src/main/scala/actor/ActorRegistry.scala +++ b/akka-actor/src/main/scala/actor/ActorRegistry.scala @@ -301,12 +301,14 @@ object ActorRegistry extends ListenerManagement { private[akka] def actors(address: Address) = actorsFor(address).actors private[akka] def actorsByUuid(address: Address) = actorsFor(address).actorsByUuid + private[akka] def actorsFactories(address: Address) = actorsFor(address).actorsFactories private[akka] def typedActors(address: Address) = actorsFor(address).typedActors private[akka] def typedActorsByUuid(address: Address) = actorsFor(address).typedActorsByUuid private[akka] class RemoteActorSet { private[ActorRegistry] val actors = new ConcurrentHashMap[String, ActorRef] private[ActorRegistry] val actorsByUuid = new ConcurrentHashMap[String, ActorRef] + private[ActorRegistry] val actorsFactories = new ConcurrentHashMap[String, () => ActorRef] private[ActorRegistry] val typedActors = new ConcurrentHashMap[String, AnyRef] private[ActorRegistry] val typedActorsByUuid = new ConcurrentHashMap[String, AnyRef] } diff --git a/akka-remote/src/main/scala/remote/RemoteServer.scala b/akka-remote/src/main/scala/remote/RemoteServer.scala index c85f2913e0..cead8bf9f8 100644 --- a/akka-remote/src/main/scala/remote/RemoteServer.scala +++ b/akka-remote/src/main/scala/remote/RemoteServer.scala @@ -302,6 +302,17 @@ class RemoteServer extends Logging with ListenerManagement { else register(id, actorRef, actors) } + /** + * Register Remote Session Actor by a specific 'id' passed as argument. + *
+ * NOTE: If you use this method to register your remote actor then you must unregister the actor by this ID yourself. + */ + def registerPerSession(id: String, factory: => ActorRef): Unit = synchronized { + log.debug("Registering server side remote session actor with id [%s]", id) + if (id.startsWith(UUID_PREFIX)) register(id.substring(UUID_PREFIX.length), factory, actorsByUuid) + else registerPerSession(id, () => factory, actorsFactories) + } + private def register[Key](id: Key, actorRef: ActorRef, registry: ConcurrentHashMap[Key, ActorRef]) { if (_isRunning && !registry.contains(id)) { if (!actorRef.isRunning) actorRef.start @@ -309,6 +320,12 @@ class RemoteServer extends Logging with ListenerManagement { } } + private def registerPerSession[Key](id: Key, factory: () => ActorRef, registry: ConcurrentHashMap[Key,() => ActorRef]) { + if (_isRunning && !registry.contains(id)) { + registry.put(id, factory) + } + } + private def registerTypedActor[Key](id: Key, typedActor: AnyRef, registry: ConcurrentHashMap[Key, AnyRef]) { if (_isRunning && !registry.contains(id)) registry.put(id, typedActor) } @@ -341,6 +358,18 @@ class RemoteServer extends Logging with ListenerManagement { } } + /** + * Unregister Remote Actor by specific 'id'. + * + * NOTE: You need to call this method if you have registered an actor by a custom ID. + */ + def unregisterPerSession(id: String):Unit = synchronized { + if (_isRunning) { + log.info("Unregistering server side remote session actor with id [%s]", id) + actorsFactories.remove(id) + } + } + /** * Unregister Remote Typed Actor by specific 'id'. * @@ -358,8 +387,10 @@ class RemoteServer extends Logging with ListenerManagement { protected[akka] override def notifyListeners(message: => Any): Unit = super.notifyListeners(message) + private[akka] def actors = ActorRegistry.actors(address) private[akka] def actorsByUuid = ActorRegistry.actorsByUuid(address) + private[akka] def actorsFactories = ActorRegistry.actorsFactories(address) private[akka] def typedActors = ActorRegistry.typedActors(address) private[akka] def typedActorsByUuid = ActorRegistry.typedActorsByUuid(address) } @@ -429,6 +460,8 @@ class RemoteServerHandler( val AW_PROXY_PREFIX = "$$ProxiedByAW".intern val CHANNEL_INIT = "channel-init".intern + val sessionActors = new ChannelLocal(); + applicationLoader.foreach(MessageSerializer.setClassLoader(_)) /** diff --git a/akka-remote/src/test/scala/remote/ServerInitiatedRemoteActorSpec.scala b/akka-remote/src/test/scala/remote/ServerInitiatedRemoteActorSpec.scala index ef60732852..ff8793218e 100644 --- a/akka-remote/src/test/scala/remote/ServerInitiatedRemoteActorSpec.scala +++ b/akka-remote/src/test/scala/remote/ServerInitiatedRemoteActorSpec.scala @@ -36,6 +36,22 @@ object ServerInitiatedRemoteActorSpec { } } + case class Login(user:String); + case class GetUser(); + + class RemoteStatefullSessionActorSpec extends Actor { + + var user : String= _; + + def receive = { + case Login(user) => + this.user = user; + case GetUser() => + self.reply(this.user) + } + } + + object RemoteActorSpecActorAsyncSender { val latch = new CountDownLatch(1) } @@ -63,6 +79,7 @@ class ServerInitiatedRemoteActorSpec extends JUnitSuite { server.register(actorOf[RemoteActorSpecActorUnidirectional]) server.register(actorOf[RemoteActorSpecActorBidirectional]) server.register(actorOf[RemoteActorSpecActorAsyncSender]) + server.registerPerSession("statefull-session-actor", actorOf[RemoteStatefullSessionActorSpec]) Thread.sleep(1000) } @@ -103,6 +120,29 @@ class ServerInitiatedRemoteActorSpec extends JUnitSuite { actor.stop } + @Test + def shouldKeepSessionInformation { + val session1 = RemoteClient.actorFor( + "statefull-session-actor", + 5000L, + HOSTNAME, PORT) + val session2 = RemoteClient.actorFor( + "statefull-session-actor", + 5000L, + HOSTNAME, PORT) + + session1 ! Login("session1"); + session2 ! Login("session2"); + + val result1 = session1 !! GetUser(); + assert("session1" === result1.get.asInstanceOf[String]) + val result2 = session2 !! GetUser(); + assert("session2" === result2.get.asInstanceOf[String]) + + session1.stop() + session2.stop() + } + @Test def shouldSendWithBangAndGetReplyThroughSenderRef { implicit val timeout = 500000000L @@ -205,6 +245,13 @@ class ServerInitiatedRemoteActorSpec extends JUnitSuite { server.unregister("my-service-1") assert(server.actors.get("my-service-1") eq null, "actor unregistered") } + @Test + def shouldRegisterAndUnregisterSession { + server.registerPerSession("my-service-1", actorOf[RemoteActorSpecActorUnidirectional]) + assert(server.actorsFactories.get("my-service-1") ne null, "actor registered") + server.unregisterPerSession("my-service-1") + assert(server.actorsFactories.get("my-service-1") eq null, "actor unregistered") + } @Test def shouldRegisterAndUnregisterByUuid {