diff --git a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala index 00eb9910fa..b7a4089fca 100644 --- a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala +++ b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala @@ -17,8 +17,7 @@ import akka.actor. {Index, ActorInitializationException, LocalActorRef, newUuid, import akka.AkkaException import akka.actor.Actor._ import akka.util._ -import akka.remote.MessageSerializer -import akka.remote.{RemoteClientSettings, RemoteServerSettings} +import akka.remote.{MessageSerializer, RemoteClientSettings, RemoteServerSettings} import org.jboss.netty.channel._ import org.jboss.netty.channel.group.{DefaultChannelGroup,ChannelGroup} @@ -152,22 +151,12 @@ abstract class RemoteClient private[akka] ( private[remote] val runSwitch = new Switch() private[remote] val isAuthenticated = new AtomicBoolean(false) - /** - * Is this client currently running? - */ private[remote] def isRunning = runSwitch.isOn protected def notifyListeners(msg: => Any); Unit protected def currentChannel: Channel - /** - * Pretty self explanatory? - */ def connect(reconnectIfAlreadyConnected: Boolean = false): Boolean - - /** - * Shuts this client down and releases any resources attached - */ def shutdown: Boolean /** @@ -259,10 +248,8 @@ abstract class RemoteClient private[akka] ( * @author Jonas Bonér */ class ActiveRemoteClient private[akka] ( - module: NettyRemoteClientModule, - remoteAddress: InetSocketAddress, - val loader: Option[ClassLoader] = None, - notifyListenersFun: (=> Any) => Unit) extends RemoteClient(module, remoteAddress) { + module: NettyRemoteClientModule, remoteAddress: InetSocketAddress, + val loader: Option[ClassLoader] = None, notifyListenersFun: (=> Any) => Unit) extends RemoteClient(module, remoteAddress) { import RemoteClientSettings._ //FIXME rewrite to a wrapper object (minimize volatile access and maximize encapsulation) @volatile private var bootstrap: ClientBootstrap = _ @@ -860,17 +847,19 @@ class RemoteServerHandler( // stop all session actors val channelActors = sessionActors.remove(event.getChannel) if (channelActors ne null) { - val channelActorsIterator = channelActors.elements - while (channelActorsIterator.hasMoreElements) { - channelActorsIterator.nextElement.stop + val elems = channelActors.elements + while (elems.hasMoreElements) { + val actor = elems.nextElement + try { actor.stop } catch { case e: Exception => log.slf4j.warn("Couldn't stop {}",actor,e)} } } val channelTypedActors = typedSessionActors.remove(event.getChannel) if (channelTypedActors ne null) { - val channelTypedActorsIterator = channelTypedActors.elements - while (channelTypedActorsIterator.hasMoreElements) { - TypedActor.stop(channelTypedActorsIterator.nextElement) + val elems = channelTypedActors.elements + while (elems.hasMoreElements) { + val actor = elems.nextElement + try { TypedActor.stop(actor) } catch { case e: Exception => log.slf4j.warn("Couldn't stop {}",actor,e)} } } @@ -904,11 +893,11 @@ class RemoteServerHandler( server.notifyListeners(RemoteServerError(event.getCause, server)) } - private def getClientAddress(ctx: ChannelHandlerContext): Option[InetSocketAddress] = { - val remoteAddress = ctx.getChannel.getRemoteAddress - if (remoteAddress.isInstanceOf[InetSocketAddress]) Some(remoteAddress.asInstanceOf[InetSocketAddress]) - else None - } + private def getClientAddress(ctx: ChannelHandlerContext): Option[InetSocketAddress] = + ctx.getChannel.getRemoteAddress match { + case inet: InetSocketAddress => Some(inet) + case _ => None + } private def handleRemoteMessageProtocol(request: RemoteMessageProtocol, channel: Channel) = { log.slf4j.debug("Received RemoteMessageProtocol[\n{}]",request) @@ -1051,17 +1040,17 @@ class RemoteServerHandler( } } - private def findSessionActor(id: String, channel: Channel) : ActorRef = { - val map = sessionActors.get(channel) - if (map ne null) map.get(id) - else null - } + private def findSessionActor(id: String, channel: Channel) : ActorRef = + sessionActors.get(channel) match { + case null => null + case map => map get id + } - private def findTypedSessionActor(id: String, channel: Channel) : AnyRef = { - val map = typedSessionActors.get(channel) - if (map ne null) map.get(id) - else null - } + private def findTypedSessionActor(id: String, channel: Channel) : AnyRef = + typedSessionActors.get(channel) match { + case null => null + case map => map get id + } /** * gets the actor from the session, or creates one if there is a factory for it @@ -1069,20 +1058,18 @@ class RemoteServerHandler( private def createSessionActor(actorInfo: ActorInfoProtocol, channel: Channel): ActorRef = { val uuid = actorInfo.getUuid val id = actorInfo.getId - val sessionActorRefOrNull = findSessionActor(id, channel) - if (sessionActorRefOrNull ne null) { - sessionActorRefOrNull - } else { - // we dont have it in the session either, see if we have a factory for it - val actorFactoryOrNull = server.findActorFactory(id) - if (actorFactoryOrNull ne null) { - val actorRef = actorFactoryOrNull() - actorRef.uuid = uuidFrom(uuid.getHigh,uuid.getLow) - sessionActors.get(channel).put(id, actorRef) - actorRef - } - else - null + + findSessionActor(id, channel) match { + case null => // we dont have it in the session either, see if we have a factory for it + server.findActorFactory(id) match { + case null => null + case factory => + val actorRef = factory() + actorRef.uuid = parseUuid(uuid) //FIXME is this sensible? + sessionActors.get(channel).put(id, actorRef) + actorRef + } + case sessionActor => sessionActor } } @@ -1101,7 +1088,7 @@ class RemoteServerHandler( val clazz = if (applicationLoader.isDefined) applicationLoader.get.loadClass(name) else Class.forName(name) val actorRef = Actor.actorOf(clazz.asInstanceOf[Class[_ <: Actor]]) - actorRef.uuid = uuidFrom(uuid.getHigh,uuid.getLow) + actorRef.uuid = parseUuid(uuid) actorRef.id = id actorRef.timeout = timeout server.actorsByUuid.put(actorRef.uuid.toString, actorRef) // register by uuid @@ -1126,16 +1113,13 @@ class RemoteServerHandler( val uuid = actorInfo.getUuid val id = actorInfo.getId - val actorRefOrNull = server.findActorByIdOrUuid(id, uuidFrom(uuid.getHigh,uuid.getLow).toString) - - if (actorRefOrNull ne null) - actorRefOrNull - else { // the actor has not been registered globally. See if we have it in the session - val sessionActorRefOrNull = createSessionActor(actorInfo, channel) - if (sessionActorRefOrNull ne null) - sessionActorRefOrNull - else // maybe it is a client managed actor - createClientManagedActor(actorInfo) + server.findActorByIdOrUuid(id, parseUuid(uuid).toString) match { + case null => // the actor has not been registered globally. See if we have it in the session + createSessionActor(actorInfo, channel) match { + case null => createClientManagedActor(actorInfo) // maybe it is a client managed actor + case sessionActor => sessionActor + } + case actorRef => actorRef } } @@ -1144,20 +1128,17 @@ class RemoteServerHandler( */ private def createTypedSessionActor(actorInfo: ActorInfoProtocol, channel: Channel):AnyRef ={ val id = actorInfo.getId - val sessionActorRefOrNull = findTypedSessionActor(id, channel) - if (sessionActorRefOrNull ne null) - sessionActorRefOrNull - else { - val actorFactoryOrNull = server.findTypedActorFactory(id) - if (actorFactoryOrNull ne null) { - val newInstance = actorFactoryOrNull() - typedSessionActors.get(channel).put(id, newInstance) - newInstance - } - else - null + findTypedSessionActor(id, channel) match { + case null => + server.findTypedActorFactory(id) match { + case null => null + case factory => + val newInstance = factory() + typedSessionActors.get(channel).put(id, newInstance) + newInstance + } + case sessionActor => sessionActor } - } private def createClientManagedTypedActor(actorInfo: ActorInfoProtocol) = { @@ -1179,7 +1160,7 @@ class RemoteServerHandler( val newInstance = TypedActor.newInstance( interfaceClass, targetClass.asInstanceOf[Class[_ <: TypedActor]], actorInfo.getTimeout).asInstanceOf[AnyRef] - server.typedActors.put(uuidFrom(uuid.getHigh,uuid.getLow).toString, newInstance) // register by uuid + server.typedActors.put(parseUuid(uuid).toString, newInstance) // register by uuid newInstance } catch { case e => @@ -1191,19 +1172,14 @@ class RemoteServerHandler( private def createTypedActor(actorInfo: ActorInfoProtocol, channel: Channel): AnyRef = { val uuid = actorInfo.getUuid - val id = actorInfo.getId - val typedActorOrNull = server.findTypedActorByIdOrUuid(id, uuidFrom(uuid.getHigh,uuid.getLow).toString) - if (typedActorOrNull ne null) - typedActorOrNull - else - { - // the actor has not been registered globally. See if we have it in the session - val sessionActorRefOrNull = createTypedSessionActor(actorInfo, channel) - if (sessionActorRefOrNull ne null) - sessionActorRefOrNull - else // maybe it is a client managed actor - createClientManagedTypedActor(actorInfo) + server.findTypedActorByIdOrUuid(actorInfo.getId, parseUuid(uuid).toString) match { + case null => // the actor has not been registered globally. See if we have it in the session + createTypedSessionActor(actorInfo, channel) match { + case null => createClientManagedTypedActor(actorInfo) //Maybe client managed actor? + case sessionActor => sessionActor + } + case typedActor => typedActor } } @@ -1241,4 +1217,6 @@ class RemoteServerHandler( log.slf4j.info("Remote client [{}] successfully authenticated using secure cookie", clientAddress) } } + + protected def parseUuid(protocol: UuidProtocol): Uuid = uuidFrom(protocol.getHigh,protocol.getLow) }