Major code clanup, switched from nested ifs to match statements etc
This commit is contained in:
parent
a61e591b2a
commit
7a0e8a82de
1 changed files with 66 additions and 88 deletions
|
|
@ -17,8 +17,7 @@ import akka.actor. {Index, ActorInitializationException, LocalActorRef, newUuid,
|
|||
import akka.AkkaException
|
||||
import akka.actor.Actor._
|
||||
import akka.util._
|
||||
import akka.remote.MessageSerializer
|
||||
import akka.remote.{RemoteClientSettings, RemoteServerSettings}
|
||||
import akka.remote.{MessageSerializer, RemoteClientSettings, RemoteServerSettings}
|
||||
|
||||
import org.jboss.netty.channel._
|
||||
import org.jboss.netty.channel.group.{DefaultChannelGroup,ChannelGroup}
|
||||
|
|
@ -152,22 +151,12 @@ abstract class RemoteClient private[akka] (
|
|||
private[remote] val runSwitch = new Switch()
|
||||
private[remote] val isAuthenticated = new AtomicBoolean(false)
|
||||
|
||||
/**
|
||||
* Is this client currently running?
|
||||
*/
|
||||
private[remote] def isRunning = runSwitch.isOn
|
||||
|
||||
protected def notifyListeners(msg: => Any); Unit
|
||||
protected def currentChannel: Channel
|
||||
|
||||
/**
|
||||
* Pretty self explanatory?
|
||||
*/
|
||||
def connect(reconnectIfAlreadyConnected: Boolean = false): Boolean
|
||||
|
||||
/**
|
||||
* Shuts this client down and releases any resources attached
|
||||
*/
|
||||
def shutdown: Boolean
|
||||
|
||||
/**
|
||||
|
|
@ -259,10 +248,8 @@ abstract class RemoteClient private[akka] (
|
|||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
*/
|
||||
class ActiveRemoteClient private[akka] (
|
||||
module: NettyRemoteClientModule,
|
||||
remoteAddress: InetSocketAddress,
|
||||
val loader: Option[ClassLoader] = None,
|
||||
notifyListenersFun: (=> Any) => Unit) extends RemoteClient(module, remoteAddress) {
|
||||
module: NettyRemoteClientModule, remoteAddress: InetSocketAddress,
|
||||
val loader: Option[ClassLoader] = None, notifyListenersFun: (=> Any) => Unit) extends RemoteClient(module, remoteAddress) {
|
||||
import RemoteClientSettings._
|
||||
//FIXME rewrite to a wrapper object (minimize volatile access and maximize encapsulation)
|
||||
@volatile private var bootstrap: ClientBootstrap = _
|
||||
|
|
@ -860,17 +847,19 @@ class RemoteServerHandler(
|
|||
// stop all session actors
|
||||
val channelActors = sessionActors.remove(event.getChannel)
|
||||
if (channelActors ne null) {
|
||||
val channelActorsIterator = channelActors.elements
|
||||
while (channelActorsIterator.hasMoreElements) {
|
||||
channelActorsIterator.nextElement.stop
|
||||
val elems = channelActors.elements
|
||||
while (elems.hasMoreElements) {
|
||||
val actor = elems.nextElement
|
||||
try { actor.stop } catch { case e: Exception => log.slf4j.warn("Couldn't stop {}",actor,e)}
|
||||
}
|
||||
}
|
||||
|
||||
val channelTypedActors = typedSessionActors.remove(event.getChannel)
|
||||
if (channelTypedActors ne null) {
|
||||
val channelTypedActorsIterator = channelTypedActors.elements
|
||||
while (channelTypedActorsIterator.hasMoreElements) {
|
||||
TypedActor.stop(channelTypedActorsIterator.nextElement)
|
||||
val elems = channelTypedActors.elements
|
||||
while (elems.hasMoreElements) {
|
||||
val actor = elems.nextElement
|
||||
try { TypedActor.stop(actor) } catch { case e: Exception => log.slf4j.warn("Couldn't stop {}",actor,e)}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -904,11 +893,11 @@ class RemoteServerHandler(
|
|||
server.notifyListeners(RemoteServerError(event.getCause, server))
|
||||
}
|
||||
|
||||
private def getClientAddress(ctx: ChannelHandlerContext): Option[InetSocketAddress] = {
|
||||
val remoteAddress = ctx.getChannel.getRemoteAddress
|
||||
if (remoteAddress.isInstanceOf[InetSocketAddress]) Some(remoteAddress.asInstanceOf[InetSocketAddress])
|
||||
else None
|
||||
}
|
||||
private def getClientAddress(ctx: ChannelHandlerContext): Option[InetSocketAddress] =
|
||||
ctx.getChannel.getRemoteAddress match {
|
||||
case inet: InetSocketAddress => Some(inet)
|
||||
case _ => None
|
||||
}
|
||||
|
||||
private def handleRemoteMessageProtocol(request: RemoteMessageProtocol, channel: Channel) = {
|
||||
log.slf4j.debug("Received RemoteMessageProtocol[\n{}]",request)
|
||||
|
|
@ -1051,17 +1040,17 @@ class RemoteServerHandler(
|
|||
}
|
||||
}
|
||||
|
||||
private def findSessionActor(id: String, channel: Channel) : ActorRef = {
|
||||
val map = sessionActors.get(channel)
|
||||
if (map ne null) map.get(id)
|
||||
else null
|
||||
}
|
||||
private def findSessionActor(id: String, channel: Channel) : ActorRef =
|
||||
sessionActors.get(channel) match {
|
||||
case null => null
|
||||
case map => map get id
|
||||
}
|
||||
|
||||
private def findTypedSessionActor(id: String, channel: Channel) : AnyRef = {
|
||||
val map = typedSessionActors.get(channel)
|
||||
if (map ne null) map.get(id)
|
||||
else null
|
||||
}
|
||||
private def findTypedSessionActor(id: String, channel: Channel) : AnyRef =
|
||||
typedSessionActors.get(channel) match {
|
||||
case null => null
|
||||
case map => map get id
|
||||
}
|
||||
|
||||
/**
|
||||
* gets the actor from the session, or creates one if there is a factory for it
|
||||
|
|
@ -1069,20 +1058,18 @@ class RemoteServerHandler(
|
|||
private def createSessionActor(actorInfo: ActorInfoProtocol, channel: Channel): ActorRef = {
|
||||
val uuid = actorInfo.getUuid
|
||||
val id = actorInfo.getId
|
||||
val sessionActorRefOrNull = findSessionActor(id, channel)
|
||||
if (sessionActorRefOrNull ne null) {
|
||||
sessionActorRefOrNull
|
||||
} else {
|
||||
// we dont have it in the session either, see if we have a factory for it
|
||||
val actorFactoryOrNull = server.findActorFactory(id)
|
||||
if (actorFactoryOrNull ne null) {
|
||||
val actorRef = actorFactoryOrNull()
|
||||
actorRef.uuid = uuidFrom(uuid.getHigh,uuid.getLow)
|
||||
sessionActors.get(channel).put(id, actorRef)
|
||||
actorRef
|
||||
}
|
||||
else
|
||||
null
|
||||
|
||||
findSessionActor(id, channel) match {
|
||||
case null => // we dont have it in the session either, see if we have a factory for it
|
||||
server.findActorFactory(id) match {
|
||||
case null => null
|
||||
case factory =>
|
||||
val actorRef = factory()
|
||||
actorRef.uuid = parseUuid(uuid) //FIXME is this sensible?
|
||||
sessionActors.get(channel).put(id, actorRef)
|
||||
actorRef
|
||||
}
|
||||
case sessionActor => sessionActor
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -1101,7 +1088,7 @@ class RemoteServerHandler(
|
|||
val clazz = if (applicationLoader.isDefined) applicationLoader.get.loadClass(name)
|
||||
else Class.forName(name)
|
||||
val actorRef = Actor.actorOf(clazz.asInstanceOf[Class[_ <: Actor]])
|
||||
actorRef.uuid = uuidFrom(uuid.getHigh,uuid.getLow)
|
||||
actorRef.uuid = parseUuid(uuid)
|
||||
actorRef.id = id
|
||||
actorRef.timeout = timeout
|
||||
server.actorsByUuid.put(actorRef.uuid.toString, actorRef) // register by uuid
|
||||
|
|
@ -1126,16 +1113,13 @@ class RemoteServerHandler(
|
|||
val uuid = actorInfo.getUuid
|
||||
val id = actorInfo.getId
|
||||
|
||||
val actorRefOrNull = server.findActorByIdOrUuid(id, uuidFrom(uuid.getHigh,uuid.getLow).toString)
|
||||
|
||||
if (actorRefOrNull ne null)
|
||||
actorRefOrNull
|
||||
else { // the actor has not been registered globally. See if we have it in the session
|
||||
val sessionActorRefOrNull = createSessionActor(actorInfo, channel)
|
||||
if (sessionActorRefOrNull ne null)
|
||||
sessionActorRefOrNull
|
||||
else // maybe it is a client managed actor
|
||||
createClientManagedActor(actorInfo)
|
||||
server.findActorByIdOrUuid(id, parseUuid(uuid).toString) match {
|
||||
case null => // the actor has not been registered globally. See if we have it in the session
|
||||
createSessionActor(actorInfo, channel) match {
|
||||
case null => createClientManagedActor(actorInfo) // maybe it is a client managed actor
|
||||
case sessionActor => sessionActor
|
||||
}
|
||||
case actorRef => actorRef
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -1144,20 +1128,17 @@ class RemoteServerHandler(
|
|||
*/
|
||||
private def createTypedSessionActor(actorInfo: ActorInfoProtocol, channel: Channel):AnyRef ={
|
||||
val id = actorInfo.getId
|
||||
val sessionActorRefOrNull = findTypedSessionActor(id, channel)
|
||||
if (sessionActorRefOrNull ne null)
|
||||
sessionActorRefOrNull
|
||||
else {
|
||||
val actorFactoryOrNull = server.findTypedActorFactory(id)
|
||||
if (actorFactoryOrNull ne null) {
|
||||
val newInstance = actorFactoryOrNull()
|
||||
typedSessionActors.get(channel).put(id, newInstance)
|
||||
newInstance
|
||||
}
|
||||
else
|
||||
null
|
||||
findTypedSessionActor(id, channel) match {
|
||||
case null =>
|
||||
server.findTypedActorFactory(id) match {
|
||||
case null => null
|
||||
case factory =>
|
||||
val newInstance = factory()
|
||||
typedSessionActors.get(channel).put(id, newInstance)
|
||||
newInstance
|
||||
}
|
||||
case sessionActor => sessionActor
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private def createClientManagedTypedActor(actorInfo: ActorInfoProtocol) = {
|
||||
|
|
@ -1179,7 +1160,7 @@ class RemoteServerHandler(
|
|||
|
||||
val newInstance = TypedActor.newInstance(
|
||||
interfaceClass, targetClass.asInstanceOf[Class[_ <: TypedActor]], actorInfo.getTimeout).asInstanceOf[AnyRef]
|
||||
server.typedActors.put(uuidFrom(uuid.getHigh,uuid.getLow).toString, newInstance) // register by uuid
|
||||
server.typedActors.put(parseUuid(uuid).toString, newInstance) // register by uuid
|
||||
newInstance
|
||||
} catch {
|
||||
case e =>
|
||||
|
|
@ -1191,19 +1172,14 @@ class RemoteServerHandler(
|
|||
|
||||
private def createTypedActor(actorInfo: ActorInfoProtocol, channel: Channel): AnyRef = {
|
||||
val uuid = actorInfo.getUuid
|
||||
val id = actorInfo.getId
|
||||
|
||||
val typedActorOrNull = server.findTypedActorByIdOrUuid(id, uuidFrom(uuid.getHigh,uuid.getLow).toString)
|
||||
if (typedActorOrNull ne null)
|
||||
typedActorOrNull
|
||||
else
|
||||
{
|
||||
// the actor has not been registered globally. See if we have it in the session
|
||||
val sessionActorRefOrNull = createTypedSessionActor(actorInfo, channel)
|
||||
if (sessionActorRefOrNull ne null)
|
||||
sessionActorRefOrNull
|
||||
else // maybe it is a client managed actor
|
||||
createClientManagedTypedActor(actorInfo)
|
||||
server.findTypedActorByIdOrUuid(actorInfo.getId, parseUuid(uuid).toString) match {
|
||||
case null => // the actor has not been registered globally. See if we have it in the session
|
||||
createTypedSessionActor(actorInfo, channel) match {
|
||||
case null => createClientManagedTypedActor(actorInfo) //Maybe client managed actor?
|
||||
case sessionActor => sessionActor
|
||||
}
|
||||
case typedActor => typedActor
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -1241,4 +1217,6 @@ class RemoteServerHandler(
|
|||
log.slf4j.info("Remote client [{}] successfully authenticated using secure cookie", clientAddress)
|
||||
}
|
||||
}
|
||||
|
||||
protected def parseUuid(protocol: UuidProtocol): Uuid = uuidFrom(protocol.getHigh,protocol.getLow)
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue