Cleaned up patch as suggested by Vicktor

This commit is contained in:
Paul Pacheco 2010-11-18 11:09:55 -06:00
parent 16640ebc50
commit 8c35885702

View file

@ -326,25 +326,28 @@ class RemoteServer extends Logging with ListenerManagement {
} }
private def register[Key](id: Key, actorRef: ActorRef, registry: ConcurrentHashMap[Key, ActorRef]) { private def register[Key](id: Key, actorRef: ActorRef, registry: ConcurrentHashMap[Key, ActorRef]) {
if (_isRunning && !registry.contains(id)) { if (_isRunning) {
if (registry.putIfAbsent(id, actorRef) eq null) {
if (!actorRef.isRunning) actorRef.start if (!actorRef.isRunning) actorRef.start
registry.put(id, actorRef) }
} }
} }
private def registerPerSession[Key](id: Key, factory: () => ActorRef, registry: ConcurrentHashMap[Key,() => ActorRef]) { private def registerPerSession[Key](id: Key, factory: () => ActorRef, registry: ConcurrentHashMap[Key,() => ActorRef]) {
if (_isRunning && !registry.contains(id)) { if (_isRunning) {
registry.put(id, factory) registry.putIfAbsent(id, factory);
} }
} }
private def registerTypedActor[Key](id: Key, typedActor: AnyRef, registry: ConcurrentHashMap[Key, AnyRef]) { private def registerTypedActor[Key](id: Key, typedActor: AnyRef, registry: ConcurrentHashMap[Key, AnyRef]) {
if (_isRunning && !registry.contains(id)) registry.put(id, typedActor) if (_isRunning) {
registry.putIfAbsent(id, typedActor);
}
} }
private def registerTypedPerSessionActor[Key](id: Key, factory: () => AnyRef, registry: ConcurrentHashMap[Key,() => AnyRef]) { private def registerTypedPerSessionActor[Key](id: Key, factory: () => AnyRef, registry: ConcurrentHashMap[Key,() => AnyRef]) {
if (_isRunning && !registry.contains(id)) { if (_isRunning) {
registry.put(id, factory) registry.putIfAbsent(id, factory);
} }
} }
@ -381,7 +384,7 @@ class RemoteServer extends Logging with ListenerManagement {
* <p/> * <p/>
* NOTE: You need to call this method if you have registered an actor by a custom ID. * NOTE: You need to call this method if you have registered an actor by a custom ID.
*/ */
def unregisterPerSession(id: String):Unit = synchronized { def unregisterPerSession(id: String):Unit = {
if (_isRunning) { if (_isRunning) {
log.info("Unregistering server side remote session actor with id [%s]", id) log.info("Unregistering server side remote session actor with id [%s]", id)
actorsFactories.remove(id) actorsFactories.remove(id)
@ -406,7 +409,7 @@ class RemoteServer extends Logging with ListenerManagement {
* <p/> * <p/>
* NOTE: You need to call this method if you have registered an actor by a custom ID. * NOTE: You need to call this method if you have registered an actor by a custom ID.
*/ */
def unregisterTypedPerSessionActor(id: String):Unit = synchronized { def unregisterTypedPerSessionActor(id: String):Unit = {
if (_isRunning) { if (_isRunning) {
typedActorsFactories.remove(id) typedActorsFactories.remove(id)
} }
@ -490,8 +493,8 @@ class RemoteServerHandler(
val AW_PROXY_PREFIX = "$$ProxiedByAW".intern val AW_PROXY_PREFIX = "$$ProxiedByAW".intern
val CHANNEL_INIT = "channel-init".intern val CHANNEL_INIT = "channel-init".intern
val sessionActors = new ChannelLocal[Map[String, ActorRef]](); val sessionActors = new ChannelLocal[ConcurrentHashMap[String, ActorRef]]();
val typedSessionActors = new ChannelLocal[Map[String, AnyRef]](); val typedSessionActors = new ChannelLocal[ConcurrentHashMap[String, AnyRef]]();
applicationLoader.foreach(MessageSerializer.setClassLoader(_)) applicationLoader.foreach(MessageSerializer.setClassLoader(_))
@ -503,8 +506,8 @@ class RemoteServerHandler(
override def channelConnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = { override def channelConnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = {
val clientAddress = getClientAddress(ctx) val clientAddress = getClientAddress(ctx)
sessionActors.set(event.getChannel(), Map[String, ActorRef]()); sessionActors.set(event.getChannel(), new ConcurrentHashMap[String, ActorRef]());
typedSessionActors.set(event.getChannel(), Map[String, AnyRef]()); typedSessionActors.set(event.getChannel(), new ConcurrentHashMap[String, AnyRef]());
log.debug("Remote client [%s] connected to [%s]", clientAddress, server.name) log.debug("Remote client [%s] connected to [%s]", clientAddress, server.name)
if (RemoteServer.SECURE) { if (RemoteServer.SECURE) {
val sslHandler: SslHandler = ctx.getPipeline.get(classOf[SslHandler]) val sslHandler: SslHandler = ctx.getPipeline.get(classOf[SslHandler])
@ -525,14 +528,12 @@ class RemoteServerHandler(
val clientAddress = getClientAddress(ctx) val clientAddress = getClientAddress(ctx)
log.debug("Remote client [%s] disconnected from [%s]", clientAddress, server.name) log.debug("Remote client [%s] disconnected from [%s]", clientAddress, server.name)
// stop all session actors // stop all session actors
for ((id, actorRef) <- sessionActors.get(event.getChannel())) { for ((id, actorRef) <- sessionActors.remove(event.getChannel())) {
actorRef.stop() actorRef.stop()
} }
sessionActors.remove(event.getChannel()); for ((id, actorRef) <- typedSessionActors.remove(event.getChannel())) {
for ((id, actorRef) <- typedSessionActors.get(event.getChannel())) {
TypedActor.stop(actorRef) TypedActor.stop(actorRef)
} }
typedSessionActors.remove(event.getChannel());
server.notifyListeners(RemoteServerClientDisconnected(server, clientAddress)) server.notifyListeners(RemoteServerClientDisconnected(server, clientAddress))
} }
@ -714,7 +715,7 @@ class RemoteServerHandler(
} }
private def findSessionActor(id: String, channel: Channel) : ActorRef = { private def findSessionActor(id: String, channel: Channel) : ActorRef = {
sessionActors.get(channel).getOrElse(id, null) sessionActors.get(channel).get(id)
} }
private def findTypedActorById(id: String) : AnyRef = { private def findTypedActorById(id: String) : AnyRef = {
@ -726,7 +727,7 @@ class RemoteServerHandler(
} }
private def findTypedSessionActor(id: String, channel: Channel) : AnyRef = { private def findTypedSessionActor(id: String, channel: Channel) : AnyRef = {
typedSessionActors.get(channel).getOrElse(id, null) typedSessionActors.get(channel).get(id)
} }
private def findTypedActorByUuid(uuid: String) : AnyRef = { private def findTypedActorByUuid(uuid: String) : AnyRef = {
@ -767,12 +768,11 @@ class RemoteServerHandler(
return actorRefOrNull return actorRefOrNull
// the actor has not been registered globally. See if we have it in the session
val sessionActorRefOrNull = findSessionActor(id, channel); val sessionActorRefOrNull = findSessionActor(id, channel);
if (sessionActorRefOrNull ne null) if (sessionActorRefOrNull ne null)
return sessionActorRefOrNull return sessionActorRefOrNull
// we dont have it in the session either, see if we have a factory for it // we dont have it in the session either, see if we have a factory for it
val actorFactoryOrNull = findActorFactory(id) val actorFactoryOrNull = findActorFactory(id)
if (actorFactoryOrNull ne null) { if (actorFactoryOrNull ne null) {