From 8c3588570256e7cca9538c7fbda83a3dbdec05cd Mon Sep 17 00:00:00 2001 From: Paul Pacheco Date: Thu, 18 Nov 2010 11:09:55 -0600 Subject: [PATCH] Cleaned up patch as suggested by Vicktor --- .../src/main/scala/remote/RemoteServer.scala | 46 +++++++++---------- 1 file changed, 23 insertions(+), 23 deletions(-) diff --git a/akka-remote/src/main/scala/remote/RemoteServer.scala b/akka-remote/src/main/scala/remote/RemoteServer.scala index 9afef140b1..4c5442de65 100644 --- a/akka-remote/src/main/scala/remote/RemoteServer.scala +++ b/akka-remote/src/main/scala/remote/RemoteServer.scala @@ -326,25 +326,28 @@ class RemoteServer extends Logging with ListenerManagement { } private def register[Key](id: Key, actorRef: ActorRef, registry: ConcurrentHashMap[Key, ActorRef]) { - if (_isRunning && !registry.contains(id)) { - if (!actorRef.isRunning) actorRef.start - registry.put(id, actorRef) + if (_isRunning) { + if (registry.putIfAbsent(id, actorRef) eq null) { + if (!actorRef.isRunning) actorRef.start + } } } private def registerPerSession[Key](id: Key, factory: () => ActorRef, registry: ConcurrentHashMap[Key,() => ActorRef]) { - if (_isRunning && !registry.contains(id)) { - registry.put(id, factory) + if (_isRunning) { + registry.putIfAbsent(id, factory); } } private def registerTypedActor[Key](id: Key, typedActor: AnyRef, registry: ConcurrentHashMap[Key, AnyRef]) { - if (_isRunning && !registry.contains(id)) registry.put(id, typedActor) + if (_isRunning) { + registry.putIfAbsent(id, typedActor); + } } private def registerTypedPerSessionActor[Key](id: Key, factory: () => AnyRef, registry: ConcurrentHashMap[Key,() => AnyRef]) { - if (_isRunning && !registry.contains(id)) { - registry.put(id, factory) + if (_isRunning) { + registry.putIfAbsent(id, factory); } } @@ -381,7 +384,7 @@ class RemoteServer extends Logging with ListenerManagement { *

* NOTE: You need to call this method if you have registered an actor by a custom ID. */ - def unregisterPerSession(id: String):Unit = synchronized { + def unregisterPerSession(id: String):Unit = { if (_isRunning) { log.info("Unregistering server side remote session actor with id [%s]", id) actorsFactories.remove(id) @@ -406,7 +409,7 @@ class RemoteServer extends Logging with ListenerManagement { *

* NOTE: You need to call this method if you have registered an actor by a custom ID. */ - def unregisterTypedPerSessionActor(id: String):Unit = synchronized { + def unregisterTypedPerSessionActor(id: String):Unit = { if (_isRunning) { typedActorsFactories.remove(id) } @@ -490,8 +493,8 @@ class RemoteServerHandler( val AW_PROXY_PREFIX = "$$ProxiedByAW".intern val CHANNEL_INIT = "channel-init".intern - val sessionActors = new ChannelLocal[Map[String, ActorRef]](); - val typedSessionActors = new ChannelLocal[Map[String, AnyRef]](); + val sessionActors = new ChannelLocal[ConcurrentHashMap[String, ActorRef]](); + val typedSessionActors = new ChannelLocal[ConcurrentHashMap[String, AnyRef]](); applicationLoader.foreach(MessageSerializer.setClassLoader(_)) @@ -503,8 +506,8 @@ class RemoteServerHandler( override def channelConnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = { val clientAddress = getClientAddress(ctx) - sessionActors.set(event.getChannel(), Map[String, ActorRef]()); - typedSessionActors.set(event.getChannel(), Map[String, AnyRef]()); + sessionActors.set(event.getChannel(), new ConcurrentHashMap[String, ActorRef]()); + typedSessionActors.set(event.getChannel(), new ConcurrentHashMap[String, AnyRef]()); log.debug("Remote client [%s] connected to [%s]", clientAddress, server.name) if (RemoteServer.SECURE) { val sslHandler: SslHandler = ctx.getPipeline.get(classOf[SslHandler]) @@ -525,15 +528,13 @@ class RemoteServerHandler( val clientAddress = getClientAddress(ctx) log.debug("Remote client [%s] disconnected from [%s]", clientAddress, server.name) // stop all session actors - for ((id, actorRef) <- sessionActors.get(event.getChannel())) { + for ((id, actorRef) <- sessionActors.remove(event.getChannel())) { actorRef.stop() } - sessionActors.remove(event.getChannel()); - for ((id, actorRef) <- typedSessionActors.get(event.getChannel())) { + for ((id, actorRef) <- typedSessionActors.remove(event.getChannel())) { TypedActor.stop(actorRef) } - typedSessionActors.remove(event.getChannel()); - + server.notifyListeners(RemoteServerClientDisconnected(server, clientAddress)) } @@ -714,7 +715,7 @@ class RemoteServerHandler( } private def findSessionActor(id: String, channel: Channel) : ActorRef = { - sessionActors.get(channel).getOrElse(id, null) + sessionActors.get(channel).get(id) } private def findTypedActorById(id: String) : AnyRef = { @@ -726,7 +727,7 @@ class RemoteServerHandler( } private def findTypedSessionActor(id: String, channel: Channel) : AnyRef = { - typedSessionActors.get(channel).getOrElse(id, null) + typedSessionActors.get(channel).get(id) } private def findTypedActorByUuid(uuid: String) : AnyRef = { @@ -767,12 +768,11 @@ class RemoteServerHandler( return actorRefOrNull - // the actor has not been registered globally. See if we have it in the session - val sessionActorRefOrNull = findSessionActor(id, channel); if (sessionActorRefOrNull ne null) return sessionActorRefOrNull + // we dont have it in the session either, see if we have a factory for it val actorFactoryOrNull = findActorFactory(id) if (actorFactoryOrNull ne null) {