Added Scalariform sbt plugin which formats code on each compile. Also checking in reformatted code

This commit is contained in:
Jonas Bonér 2011-05-18 17:25:30 +02:00
parent 5949673092
commit a7311c83e6
177 changed files with 4184 additions and 4245 deletions

View file

@ -4,27 +4,39 @@
package akka.remote.netty
import akka.dispatch.{DefaultCompletableFuture, CompletableFuture, Future}
import akka.remote.{MessageSerializer, RemoteClientSettings, RemoteServerSettings}
import akka.dispatch.{ DefaultCompletableFuture, CompletableFuture, Future }
import akka.remote.{ MessageSerializer, RemoteClientSettings, RemoteServerSettings }
import akka.remote.protocol.RemoteProtocol._
import akka.remote.protocol.RemoteProtocol.ActorType._
import akka.serialization.RemoteActorSerialization
import akka.serialization.RemoteActorSerialization._
import akka.remoteinterface._
import akka.actor.{PoisonPill, Index, LocalActorRef, Actor, RemoteActorRef,
TypedActor, ActorRef, IllegalActorStateException,
RemoteActorSystemMessage, uuidFrom, Uuid,
Exit, LifeCycleMessage, ActorType => AkkaActorType}
import akka.actor.{
PoisonPill,
Index,
LocalActorRef,
Actor,
RemoteActorRef,
TypedActor,
ActorRef,
IllegalActorStateException,
RemoteActorSystemMessage,
uuidFrom,
Uuid,
Exit,
LifeCycleMessage,
ActorType AkkaActorType
}
import akka.actor.Actor._
import akka.config.Config._
import akka.util._
import akka.event.EventHandler
import org.jboss.netty.channel._
import org.jboss.netty.channel.group.{DefaultChannelGroup,ChannelGroup,ChannelGroupFuture}
import org.jboss.netty.channel.group.{ DefaultChannelGroup, ChannelGroup, ChannelGroupFuture }
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory
import org.jboss.netty.bootstrap.{ServerBootstrap, ClientBootstrap}
import org.jboss.netty.bootstrap.{ ServerBootstrap, ClientBootstrap }
import org.jboss.netty.handler.codec.frame.{ LengthFieldBasedFrameDecoder, LengthFieldPrepender }
import org.jboss.netty.handler.codec.compression.{ ZlibDecoder, ZlibEncoder }
import org.jboss.netty.handler.codec.protobuf.{ ProtobufDecoder, ProtobufEncoder }
@ -37,7 +49,7 @@ import scala.collection.JavaConversions._
import java.net.InetSocketAddress
import java.lang.reflect.InvocationTargetException
import java.util.concurrent.atomic.{AtomicReference, AtomicBoolean}
import java.util.concurrent.atomic.{ AtomicReference, AtomicBoolean }
import java.util.concurrent._
import akka.AkkaException
@ -57,10 +69,10 @@ object RemoteEncoder {
}
}
trait NettyRemoteClientModule extends RemoteClientModule { self: ListenerManagement =>
trait NettyRemoteClientModule extends RemoteClientModule { self: ListenerManagement
private val remoteClients = new HashMap[Address, RemoteClient]
private val remoteActors = new Index[Address, Uuid]
private val lock = new ReadWriteGuard
private val remoteActors = new Index[Address, Uuid]
private val lock = new ReadWriteGuard
protected[akka] def typedActorFor[T](intfClass: Class[T], serviceId: String, timeout: Long, hostname: String, port: Int, loader: Option[ClassLoader]): T =
TypedActor.createProxyForRemoteActorRef(intfClass, RemoteActorRef(serviceId, timeout, loader, AkkaActorType.TypedActor))
@ -75,24 +87,24 @@ trait NettyRemoteClientModule extends RemoteClientModule { self: ListenerManagem
typedActorInfo: Option[Tuple2[String, String]],
actorType: AkkaActorType,
loader: Option[ClassLoader]): Option[CompletableFuture[T]] =
withClientFor(remoteAddress, loader)(_.send[T](message, senderOption, senderFuture, remoteAddress, timeout, isOneWay, actorRef, typedActorInfo, actorType))
withClientFor(remoteAddress, loader)(_.send[T](message, senderOption, senderFuture, remoteAddress, timeout, isOneWay, actorRef, typedActorInfo, actorType))
private[akka] def withClientFor[T](
address: InetSocketAddress, loader: Option[ClassLoader])(fun: RemoteClient => T): T = {
address: InetSocketAddress, loader: Option[ClassLoader])(fun: RemoteClient T): T = {
loader.foreach(MessageSerializer.setClassLoader(_))
val key = Address(address)
lock.readLock.lock
try {
val c = remoteClients.get(key) match {
case s: Some[RemoteClient] => s.get
case None =>
case s: Some[RemoteClient] s.get
case None
lock.readLock.unlock
lock.writeLock.lock //Lock upgrade, not supported natively
try {
try {
remoteClients.get(key) match { //Recheck for addition, race between upgrades
case s: Some[RemoteClient] => s.get //If already populated by other writer
case None => //Populate map
case s: Some[RemoteClient] s.get //If already populated by other writer
case None //Populate map
val client = new ActiveRemoteClient(this, address, loader, self.notifyListeners _)
client.connect()
remoteClients += key -> client
@ -107,15 +119,15 @@ trait NettyRemoteClientModule extends RemoteClientModule { self: ListenerManagem
def shutdownClientConnection(address: InetSocketAddress): Boolean = lock withWriteGuard {
remoteClients.remove(Address(address)) match {
case s: Some[RemoteClient] => s.get.shutdown()
case None => false
case s: Some[RemoteClient] s.get.shutdown()
case None false
}
}
def restartClientConnection(address: InetSocketAddress): Boolean = lock withReadGuard {
remoteClients.get(Address(address)) match {
case s: Some[RemoteClient] => s.get.connect(reconnectIfAlreadyConnected = true)
case None => false
case s: Some[RemoteClient] s.get.connect(reconnectIfAlreadyConnected = true)
case None false
}
}
@ -129,7 +141,7 @@ trait NettyRemoteClientModule extends RemoteClientModule { self: ListenerManagem
}
def shutdownRemoteClients() = lock withWriteGuard {
remoteClients.foreach({ case (addr, client) => client.shutdown() })
remoteClients.foreach({ case (addr, client) client.shutdown() })
remoteClients.clear()
}
}
@ -143,25 +155,25 @@ abstract class RemoteClient private[akka] (
val module: NettyRemoteClientModule,
val remoteAddress: InetSocketAddress) {
val useTransactionLog = config.getBool("akka.remote.client.buffering.retry-message-send-on-failure", true)
val useTransactionLog = config.getBool("akka.remote.client.buffering.retry-message-send-on-failure", true)
val transactionLogCapacity = config.getInt("akka.remote.client.buffering.capacity", -1)
val name = this.getClass.getSimpleName + "@" +
remoteAddress.getAddress.getHostAddress + "::" +
remoteAddress.getPort
remoteAddress.getAddress.getHostAddress + "::" +
remoteAddress.getPort
protected val futures = new ConcurrentHashMap[Uuid, CompletableFuture[_]]
protected val futures = new ConcurrentHashMap[Uuid, CompletableFuture[_]]
protected val pendingRequests = {
if (transactionLogCapacity < 0) new ConcurrentLinkedQueue[(Boolean, Uuid, RemoteMessageProtocol)]
else new LinkedBlockingQueue[(Boolean, Uuid, RemoteMessageProtocol)](transactionLogCapacity)
else new LinkedBlockingQueue[(Boolean, Uuid, RemoteMessageProtocol)](transactionLogCapacity)
}
private[remote] val runSwitch = new Switch()
private[remote] val runSwitch = new Switch()
private[remote] val isAuthenticated = new AtomicBoolean(false)
private[remote] def isRunning = runSwitch.isOn
protected def notifyListeners(msg: => Any): Unit
protected def notifyListeners(msg: Any): Unit
protected def currentChannel: Channel
@ -197,17 +209,16 @@ abstract class RemoteClient private[akka] (
actorType: AkkaActorType): Option[CompletableFuture[T]] = synchronized { // FIXME: find better strategy to prevent race
send(createRemoteMessageProtocolBuilder(
Some(actorRef),
Left(actorRef.uuid),
actorRef.address,
timeout,
Right(message),
isOneWay,
senderOption,
typedActorInfo,
actorType,
if (isAuthenticated.compareAndSet(false, true)) RemoteClientSettings.SECURE_COOKIE else None
).build, senderFuture)
Some(actorRef),
Left(actorRef.uuid),
actorRef.address,
timeout,
Right(message),
isOneWay,
senderOption,
typedActorInfo,
actorType,
if (isAuthenticated.compareAndSet(false, true)) RemoteClientSettings.SECURE_COOKIE else None).build, senderFuture)
}
/**
@ -226,19 +237,18 @@ abstract class RemoteClient private[akka] (
throw future.getCause
}
} catch {
case e: Throwable =>
case e: Throwable
// add the request to the tx log after a failing send
notifyListeners(RemoteClientError(e, module, remoteAddress))
if (useTransactionLog) {
if (!pendingRequests.offer((true, null, request)))
throw new RemoteClientMessageBufferException("Buffer limit [" + transactionLogCapacity + "] reached")
}
else throw e
} else throw e
}
None
} else {
val futureResult = if (senderFuture.isDefined) senderFuture.get
else new DefaultCompletableFuture[T](request.getActorInfo.getTimeout)
else new DefaultCompletableFuture[T](request.getActorInfo.getTimeout)
val futureUuid = uuidFrom(request.getUuid.getHigh, request.getUuid.getLow)
futures.put(futureUuid, futureResult) // Add future prematurely, remove it if write fails
@ -248,7 +258,7 @@ abstract class RemoteClient private[akka] (
if (!pendingRequests.offer((false, futureUuid, request))) // Add the request to the tx log after a failing send
throw new RemoteClientMessageBufferException("Buffer limit [" + transactionLogCapacity + "] reached")
} else {
val f = futures.remove(futureUuid) // Clean up future
val f = futures.remove(futureUuid) // Clean up future
if (f ne null) f.completeWithException(future.getCause)
}
}
@ -258,10 +268,10 @@ abstract class RemoteClient private[akka] (
// try to send the original one
future = currentChannel.write(RemoteEncoder.encode(request))
future.awaitUninterruptibly()
if (future.isCancelled) futures.remove(futureUuid) // Clean up future
if (future.isCancelled) futures.remove(futureUuid) // Clean up future
else if (!future.isSuccess) handleRequestReplyError(future)
} catch {
case e: Exception => handleRequestReplyError(future)
case e: Exception handleRequestReplyError(future)
}
Some(futureResult)
}
@ -296,7 +306,7 @@ abstract class RemoteClient private[akka] (
}
}
pendingRequests.remove(pendingRequest)
pendingRequest = pendingRequests.peek // try to grab next message
pendingRequest = pendingRequests.peek // try to grab next message
}
}
}
@ -308,17 +318,22 @@ abstract class RemoteClient private[akka] (
*/
class ActiveRemoteClient private[akka] (
module: NettyRemoteClientModule, remoteAddress: InetSocketAddress,
val loader: Option[ClassLoader] = None, notifyListenersFun: (=> Any) => Unit) extends RemoteClient(module, remoteAddress) {
val loader: Option[ClassLoader] = None, notifyListenersFun: (Any) Unit) extends RemoteClient(module, remoteAddress) {
import RemoteClientSettings._
//FIXME rewrite to a wrapper object (minimize volatile access and maximize encapsulation)
@volatile private var bootstrap: ClientBootstrap = _
@volatile private[remote] var connection: ChannelFuture = _
@volatile private[remote] var openChannels: DefaultChannelGroup = _
@volatile private var timer: HashedWheelTimer = _
@volatile private var reconnectionTimeWindowStart = 0L
@volatile
private var bootstrap: ClientBootstrap = _
@volatile
private[remote] var connection: ChannelFuture = _
@volatile
private[remote] var openChannels: DefaultChannelGroup = _
@volatile
private var timer: HashedWheelTimer = _
@volatile
private var reconnectionTimeWindowStart = 0L
def notifyListeners(msg: => Any): Unit = notifyListenersFun(msg)
def notifyListeners(msg: Any): Unit = notifyListenersFun(msg)
def currentChannel = connection.getChannel
def connect(reconnectIfAlreadyConnected: Boolean = false): Boolean = {
@ -342,9 +357,9 @@ class ActiveRemoteClient private[akka] (
//Add a task that does GCing of expired Futures
timer.newTimeout(new TimerTask() {
def run(timeout: Timeout) = {
if(isRunning) {
if (isRunning) {
val i = futures.entrySet.iterator
while(i.hasNext) {
while (i.hasNext) {
val e = i.next
if (e.getValue.isExpired)
futures.remove(e.getKey)
@ -356,8 +371,8 @@ class ActiveRemoteClient private[akka] (
true
}
} match {
case true => true
case false if reconnectIfAlreadyConnected =>
case true true
case false if reconnectIfAlreadyConnected
isAuthenticated.set(false)
openChannels.remove(connection.getChannel)
connection.getChannel.close
@ -367,7 +382,7 @@ class ActiveRemoteClient private[akka] (
notifyListeners(RemoteClientError(connection.getCause, module, remoteAddress))
false
} else true
case false => false
case false false
}
}
@ -408,14 +423,14 @@ class ActiveRemoteClientPipelineFactory(
client: ActiveRemoteClient) extends ChannelPipelineFactory {
def getPipeline: ChannelPipeline = {
val timeout = new ReadTimeoutHandler(timer, RemoteClientSettings.READ_TIMEOUT.length, RemoteClientSettings.READ_TIMEOUT.unit)
val lenDec = new LengthFieldBasedFrameDecoder(RemoteClientSettings.MESSAGE_FRAME_SIZE, 0, 4, 0, 4)
val lenPrep = new LengthFieldPrepender(4)
val timeout = new ReadTimeoutHandler(timer, RemoteClientSettings.READ_TIMEOUT.length, RemoteClientSettings.READ_TIMEOUT.unit)
val lenDec = new LengthFieldBasedFrameDecoder(RemoteClientSettings.MESSAGE_FRAME_SIZE, 0, 4, 0, 4)
val lenPrep = new LengthFieldPrepender(4)
val protobufDec = new ProtobufDecoder(AkkaRemoteProtocol.getDefaultInstance)
val protobufEnc = new ProtobufEncoder
val (enc, dec) = RemoteServerSettings.COMPRESSION_SCHEME match {
case "zlib" => (new ZlibEncoder(RemoteServerSettings.ZLIB_COMPRESSION_LEVEL) :: Nil, new ZlibDecoder :: Nil)
case _ => (Nil,Nil)
val (enc, dec) = RemoteServerSettings.COMPRESSION_SCHEME match {
case "zlib" (new ZlibEncoder(RemoteServerSettings.ZLIB_COMPRESSION_LEVEL) :: Nil, new ZlibDecoder :: Nil)
case _ (Nil, Nil)
}
val remoteClient = new ActiveRemoteClientHandler(name, futures, bootstrap, remoteAddress, timer, client)
@ -440,12 +455,12 @@ class ActiveRemoteClientHandler(
override def messageReceived(ctx: ChannelHandlerContext, event: MessageEvent) {
try {
event.getMessage match {
case arp: AkkaRemoteProtocol if arp.hasInstruction =>
case arp: AkkaRemoteProtocol if arp.hasInstruction
val rcp = arp.getInstruction
rcp.getCommandType match {
case CommandType.SHUTDOWN => spawn { client.module.shutdownClientConnection(remoteAddress) }
case CommandType.SHUTDOWN spawn { client.module.shutdownClientConnection(remoteAddress) }
}
case arp: AkkaRemoteProtocol if arp.hasMessage =>
case arp: AkkaRemoteProtocol if arp.hasMessage
val reply = arp.getMessage
val replyUuid = uuidFrom(reply.getActorInfo.getUuid.getHigh, reply.getActorInfo.getUuid.getLow)
val future = futures.remove(replyUuid).asInstanceOf[CompletableFuture[Any]]
@ -457,11 +472,11 @@ class ActiveRemoteClientHandler(
} else {
future.completeWithException(parseException(reply, client.loader))
}
case other =>
case other
throw new RemoteClientException("Unknown message received in remote client handler: " + other, client.module, client.remoteAddress)
}
} catch {
case e: Throwable =>
case e: Throwable
EventHandler.error(e, this, e.getMessage)
client.notifyListeners(RemoteClientError(e, client.module, client.remoteAddress))
throw e
@ -487,7 +502,7 @@ class ActiveRemoteClientHandler(
client.notifyListeners(RemoteClientConnected(client.module, client.remoteAddress))
client.resetReconnectionTimeWindow
} catch {
case e: Throwable =>
case e: Throwable
EventHandler.error(e, this, e.getMessage)
client.notifyListeners(RemoteClientError(e, client.module, client.remoteAddress))
throw e
@ -500,9 +515,9 @@ class ActiveRemoteClientHandler(
override def exceptionCaught(ctx: ChannelHandlerContext, event: ExceptionEvent) = {
event.getCause match {
case e: ReadTimeoutException =>
case e: ReadTimeoutException
spawn { client.module.shutdownClientConnection(remoteAddress) }
case e =>
case e
client.notifyListeners(RemoteClientError(e, client.module, client.remoteAddress))
event.getChannel.close //FIXME Is this the correct behavior?
}
@ -513,12 +528,12 @@ class ActiveRemoteClientHandler(
val classname = exception.getClassname
try {
val exceptionClass = if (loader.isDefined) loader.get.loadClass(classname)
else Class.forName(classname)
else Class.forName(classname)
exceptionClass
.getConstructor(Array[Class[_]](classOf[String]): _*)
.newInstance(exception.getMessage).asInstanceOf[Throwable]
} catch {
case problem: Throwable =>
case problem: Throwable
EventHandler.error(problem, this, problem.getMessage)
CannotInstantiateRemoteExceptionDueToRemoteProtocolParsingErrorException(problem, classname, exception.getMessage)
}
@ -538,8 +553,8 @@ class NettyRemoteSupport extends RemoteSupport with NettyRemoteServerModule with
protected[akka] def actorFor(address: String, timeout: Long, host: String, port: Int, loader: Option[ClassLoader]): ActorRef = {
if (optimizeLocalScoped_?) {
val home = this.address
if ((host == home.getAddress.getHostAddress || host == home.getHostName) && port == home.getPort) {//TODO: switch to InetSocketAddress.equals?
val localRef = findActorByAddressOrUuid(address,address)
if ((host == home.getAddress.getHostAddress || host == home.getHostName) && port == home.getPort) { //TODO: switch to InetSocketAddress.equals?
val localRef = findActorByAddressOrUuid(address, address)
if (localRef ne null) return localRef //Code significantly simpler with the return statement
}
}
@ -551,9 +566,9 @@ class NettyRemoteSupport extends RemoteSupport with NettyRemoteServerModule with
class NettyRemoteServer(serverModule: NettyRemoteServerModule, val host: String, val port: Int, val loader: Option[ClassLoader]) {
val name = "NettyRemoteServer@" + host + ":" + port
val address = new InetSocketAddress(host,port)
val address = new InetSocketAddress(host, port)
private val factory = new NioServerSocketChannelFactory(Executors.newCachedThreadPool,Executors.newCachedThreadPool)
private val factory = new NioServerSocketChannelFactory(Executors.newCachedThreadPool, Executors.newCachedThreadPool)
private val bootstrap = new ServerBootstrap(factory)
@ -586,26 +601,26 @@ class NettyRemoteServer(serverModule: NettyRemoteServerModule, val host: String,
bootstrap.releaseExternalResources()
serverModule.notifyListeners(RemoteServerShutdown(serverModule))
} catch {
case e: Exception =>
case e: Exception
EventHandler.error(e, this, e.getMessage)
}
}
}
trait NettyRemoteServerModule extends RemoteServerModule { self: RemoteModule =>
trait NettyRemoteServerModule extends RemoteServerModule { self: RemoteModule
import RemoteServerSettings._
private[akka] val currentServer = new AtomicReference[Option[NettyRemoteServer]](None)
def address = currentServer.get match {
case s: Some[NettyRemoteServer] => s.get.address
case None => ReflectiveAccess.RemoteModule.configDefaultAddress
case s: Some[NettyRemoteServer] s.get.address
case None ReflectiveAccess.RemoteModule.configDefaultAddress
}
def name = currentServer.get match {
case s: Some[NettyRemoteServer] => s.get.name
case None =>
val a = ReflectiveAccess.RemoteModule.configDefaultAddress
case s: Some[NettyRemoteServer] s.get.name
case None
val a = ReflectiveAccess.RemoteModule.configDefaultAddress
"NettyRemoteServer@" + a.getAddress.getHostAddress + ":" + a.getPort
}
@ -619,7 +634,7 @@ trait NettyRemoteServerModule extends RemoteServerModule { self: RemoteModule =>
currentServer.set(Some(new NettyRemoteServer(this, _hostname, _port, loader)))
}
} catch {
case e: Exception =>
case e: Exception
EventHandler.error(e, this, e.getMessage)
notifyListeners(RemoteServerError(e, this))
}
@ -628,8 +643,7 @@ trait NettyRemoteServerModule extends RemoteServerModule { self: RemoteModule =>
def shutdownServerModule() = guard withGuard {
_isRunning switchOff {
currentServer.getAndSet(None) foreach {
instance =>
currentServer.getAndSet(None) foreach { instance
instance.shutdown()
}
}
@ -650,8 +664,8 @@ trait NettyRemoteServerModule extends RemoteServerModule { self: RemoteModule =>
* @param id custom actor id
* @param typedActor typed actor to register
*/
def registerTypedPerSessionActor(id: String, factory: => AnyRef): Unit = guard withGuard {
registerTypedPerSessionActor(id, () => factory, typedActorsFactories)
def registerTypedPerSessionActor(id: String, factory: AnyRef): Unit = guard withGuard {
registerTypedPerSessionActor(id, () factory, typedActorsFactories)
}
/**
@ -680,11 +694,11 @@ trait NettyRemoteServerModule extends RemoteServerModule { self: RemoteModule =>
* <p/>
* NOTE: If you use this method to register your remote actor then you must unregister the actor by this ID yourself.
*/
def registerPerSession(id: String, factory: => ActorRef): Unit = synchronized {
registerPerSession(id, () => factory, actorsFactories)
def registerPerSession(id: String, factory: ActorRef): Unit = synchronized {
registerPerSession(id, () factory, actorsFactories)
}
private def registerPerSession[Key](id: Key, factory: () => ActorRef, registry: ConcurrentHashMap[Key,() => ActorRef]) {
private def registerPerSession[Key](id: Key, factory: () ActorRef, registry: ConcurrentHashMap[Key, () ActorRef]) {
if (_isRunning.isOn)
registry.put(id, factory) //TODO change to putIfAbsent
}
@ -694,7 +708,7 @@ trait NettyRemoteServerModule extends RemoteServerModule { self: RemoteModule =>
registry.put(id, typedActor) //TODO change to putIfAbsent
}
private def registerTypedPerSessionActor[Key](id: Key, factory: () => AnyRef, registry: ConcurrentHashMap[Key,() => AnyRef]) {
private def registerTypedPerSessionActor[Key](id: Key, factory: () AnyRef, registry: ConcurrentHashMap[Key, () AnyRef]) {
if (_isRunning.isOn)
registry.put(id, factory) //TODO change to putIfAbsent
}
@ -720,7 +734,7 @@ trait NettyRemoteServerModule extends RemoteServerModule { self: RemoteModule =>
else {
val actorRef = actors get id
actorsByUuid.remove(actorRef.uuid, actorRef)
actors.remove(id,actorRef)
actors.remove(id, actorRef)
}
}
}
@ -741,7 +755,7 @@ trait NettyRemoteServerModule extends RemoteServerModule { self: RemoteModule =>
* <p/>
* NOTE: You need to call this method if you have registered an actor by a custom ID.
*/
def unregisterTypedActor(id: String):Unit = guard withGuard {
def unregisterTypedActor(id: String): Unit = guard withGuard {
if (_isRunning.isOn) {
if (id.startsWith(UUID_PREFIX)) typedActorsByUuid.remove(id.substring(UUID_PREFIX.length))
else typedActors.remove(id)
@ -749,10 +763,10 @@ trait NettyRemoteServerModule extends RemoteServerModule { self: RemoteModule =>
}
/**
* Unregister RemoteModule Typed Actor by specific 'id'.
* <p/>
* NOTE: You need to call this method if you have registered an actor by a custom ID.
*/
* Unregister RemoteModule Typed Actor by specific 'id'.
* <p/>
* NOTE: You need to call this method if you have registered an actor by a custom ID.
*/
def unregisterTypedPerSessionActor(id: String): Unit =
if (_isRunning.isOn) typedActorsFactories.remove(id)
}
@ -761,20 +775,20 @@ trait NettyRemoteServerModule extends RemoteServerModule { self: RemoteModule =>
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class RemoteServerPipelineFactory(
val name: String,
val openChannels: ChannelGroup,
val loader: Option[ClassLoader],
val server: NettyRemoteServerModule) extends ChannelPipelineFactory {
val name: String,
val openChannels: ChannelGroup,
val loader: Option[ClassLoader],
val server: NettyRemoteServerModule) extends ChannelPipelineFactory {
import RemoteServerSettings._
def getPipeline: ChannelPipeline = {
val lenDec = new LengthFieldBasedFrameDecoder(MESSAGE_FRAME_SIZE, 0, 4, 0, 4)
val lenPrep = new LengthFieldPrepender(4)
val lenDec = new LengthFieldBasedFrameDecoder(MESSAGE_FRAME_SIZE, 0, 4, 0, 4)
val lenPrep = new LengthFieldPrepender(4)
val protobufDec = new ProtobufDecoder(AkkaRemoteProtocol.getDefaultInstance)
val protobufEnc = new ProtobufEncoder
val (enc, dec) = COMPRESSION_SCHEME match {
case "zlib" => (new ZlibEncoder(ZLIB_COMPRESSION_LEVEL) :: Nil, new ZlibDecoder :: Nil)
case _ => (Nil, Nil)
val (enc, dec) = COMPRESSION_SCHEME match {
case "zlib" (new ZlibEncoder(ZLIB_COMPRESSION_LEVEL) :: Nil, new ZlibDecoder :: Nil)
case _ (Nil, Nil)
}
val execution = new ExecutionHandler(
new OrderedMemoryAwareThreadPoolExecutor(
@ -782,9 +796,7 @@ class RemoteServerPipelineFactory(
MAX_CHANNEL_MEMORY_SIZE,
MAX_TOTAL_MEMORY_SIZE,
EXECUTION_POOL_KEEPALIVE.length,
EXECUTION_POOL_KEEPALIVE.unit
)
)
EXECUTION_POOL_KEEPALIVE.unit))
val remoteServer = new RemoteServerHandler(name, openChannels, loader, server)
val stages: List[ChannelHandler] = dec ::: lenDec :: protobufDec :: enc ::: lenPrep :: protobufEnc :: execution :: remoteServer :: Nil
new StaticChannelPipeline(stages: _*)
@ -796,12 +808,12 @@ class RemoteServerPipelineFactory(
*/
@ChannelHandler.Sharable
class RemoteServerHandler(
val name: String,
val openChannels: ChannelGroup,
val applicationLoader: Option[ClassLoader],
val server: NettyRemoteServerModule) extends SimpleChannelUpstreamHandler {
val name: String,
val openChannels: ChannelGroup,
val applicationLoader: Option[ClassLoader],
val server: NettyRemoteServerModule) extends SimpleChannelUpstreamHandler {
import RemoteServerSettings._
val CHANNEL_INIT = "channel-init".intern
val CHANNEL_INIT = "channel-init".intern
applicationLoader.foreach(MessageSerializer.setClassLoader(_)) //TODO: REVISIT: THIS FEELS A BIT DODGY
@ -817,8 +829,8 @@ class RemoteServerHandler(
//Not interesting at the moment
} else if (!future.isSuccess) {
val socketAddress = future.getChannel.getRemoteAddress match {
case i: InetSocketAddress => Some(i)
case _ => None
case i: InetSocketAddress Some(i)
case _ None
}
server.notifyListeners(RemoteServerWriteFailed(payload, future.getCause, server, socketAddress))
}
@ -844,16 +856,20 @@ class RemoteServerHandler(
val clientAddress = getClientAddress(ctx)
// stop all session actors
for (map <- Option(sessionActors.remove(event.getChannel));
actor <- collectionAsScalaIterable(map.values)) {
try { actor ! PoisonPill } catch { case e: Exception => }
for (
map Option(sessionActors.remove(event.getChannel));
actor collectionAsScalaIterable(map.values)
) {
try { actor ! PoisonPill } catch { case e: Exception }
}
//FIXME switch approach or use other thread to execute this
// stop all typed session actors
for (map <- Option(typedSessionActors.remove(event.getChannel));
actor <- collectionAsScalaIterable(map.values)) {
try { TypedActor.stop(actor) } catch { case e: Exception => }
for (
map Option(typedSessionActors.remove(event.getChannel));
actor collectionAsScalaIterable(map.values)
) {
try { TypedActor.stop(actor) } catch { case e: Exception }
}
server.notifyListeners(RemoteServerClientDisconnected(server, clientAddress))
@ -865,13 +881,13 @@ class RemoteServerHandler(
}
override def messageReceived(ctx: ChannelHandlerContext, event: MessageEvent) = event.getMessage match {
case null => throw new IllegalActorStateException("Message in remote MessageEvent is null: " + event)
case null throw new IllegalActorStateException("Message in remote MessageEvent is null: " + event)
//case remoteProtocol: AkkaRemoteProtocol if remoteProtocol.hasInstruction => RemoteServer cannot receive control messages (yet)
case remoteProtocol: AkkaRemoteProtocol if remoteProtocol.hasMessage =>
case remoteProtocol: AkkaRemoteProtocol if remoteProtocol.hasMessage
val requestProtocol = remoteProtocol.getMessage
if (REQUIRE_COOKIE) authenticateRemoteClient(requestProtocol, ctx)
handleRemoteMessageProtocol(requestProtocol, event.getChannel)
case _ => //ignore
case _ //ignore
}
override def exceptionCaught(ctx: ChannelHandlerContext, event: ExceptionEvent) = {
@ -881,16 +897,16 @@ class RemoteServerHandler(
private def getClientAddress(ctx: ChannelHandlerContext): Option[InetSocketAddress] =
ctx.getChannel.getRemoteAddress match {
case inet: InetSocketAddress => Some(inet)
case _ => None
case inet: InetSocketAddress Some(inet)
case _ None
}
private def handleRemoteMessageProtocol(request: RemoteMessageProtocol, channel: Channel) = {
request.getActorInfo.getActorType match {
case SCALA_ACTOR => dispatchToActor(request, channel)
case TYPED_ACTOR => dispatchToTypedActor(request, channel)
case JAVA_ACTOR => throw new IllegalActorStateException("ActorType JAVA_ACTOR is currently not supported")
case other => throw new IllegalActorStateException("Unknown ActorType [" + other + "]")
case SCALA_ACTOR dispatchToActor(request, channel)
case TYPED_ACTOR dispatchToTypedActor(request, channel)
case JAVA_ACTOR throw new IllegalActorStateException("ActorType JAVA_ACTOR is currently not supported")
case other throw new IllegalActorStateException("Unknown ActorType [" + other + "]")
}
}
@ -899,7 +915,7 @@ class RemoteServerHandler(
val actorRef =
try { createActor(actorInfo, channel) } catch {
case e: SecurityException =>
case e: SecurityException
EventHandler.error(e, this, e.getMessage)
write(channel, createErrorReplyMessage(e, request, AkkaActorType.ScalaActor))
server.notifyListeners(RemoteServerError(e, server))
@ -912,14 +928,14 @@ class RemoteServerHandler(
else None
message match { // first match on system messages
case RemoteActorSystemMessage.Stop =>
case RemoteActorSystemMessage.Stop
if (UNTRUSTED_MODE) throw new SecurityException("RemoteModule server is operating is untrusted mode, can not stop the actor")
else actorRef.stop()
case _: LifeCycleMessage if (UNTRUSTED_MODE) =>
case _: LifeCycleMessage if (UNTRUSTED_MODE)
throw new SecurityException("RemoteModule server is operating is untrusted mode, can not pass on a LifeCycleMessage to the remote actor")
case _ => // then match on user defined messages
case _ // then match on user defined messages
if (request.getOneWay) actorRef.!(message)(sender)
else actorRef.postMessageToMailboxAndCreateFutureResultWithTimeout(
message,
@ -927,9 +943,9 @@ class RemoteServerHandler(
None,
Some(new DefaultCompletableFuture[Any](request.getActorInfo.getTimeout).
onComplete(_.value.get match {
case l: Left[Throwable, Any] => write(channel, createErrorReplyMessage(l.a, request, AkkaActorType.ScalaActor))
case r: Right[Throwable, Any] =>
val messageBuilder = RemoteActorSerialization.createRemoteMessageProtocolBuilder(
case l: Left[Throwable, Any] write(channel, createErrorReplyMessage(l.a, request, AkkaActorType.ScalaActor))
case r: Right[Throwable, Any]
val messageBuilder = RemoteActorSerialization.createRemoteMessageProtocolBuilder(
Some(actorRef),
Right(request.getUuid),
actorInfo.getAddress,
@ -945,9 +961,7 @@ class RemoteServerHandler(
if (request.hasSupervisorUuid) messageBuilder.setSupervisorUuid(request.getSupervisorUuid)
write(channel, RemoteEncoder.encode(messageBuilder.build))
}
)
))
})))
}
}
@ -967,8 +981,8 @@ class RemoteServerHandler(
//FIXME: Add ownerTypeHint and parameter types to the TypedActorInfo?
val (ownerTypeHint, argClasses, args) =
MessageSerializer
.deserialize(request.getMessage)
.asInstanceOf[Tuple3[String,Array[Class[_]],Array[AnyRef]]]
.deserialize(request.getMessage)
.asInstanceOf[Tuple3[String, Array[Class[_]], Array[AnyRef]]]
def resolveMethod(bottomType: Class[_],
typeHint: String,
@ -977,14 +991,14 @@ class RemoteServerHandler(
var typeToResolve = bottomType
var targetMethod: java.lang.reflect.Method = null
var firstException: NoSuchMethodException = null
while((typeToResolve ne null) && (targetMethod eq null)) {
while ((typeToResolve ne null) && (targetMethod eq null)) {
if ((typeHint eq null) || typeToResolve.getName.startsWith(typeHint)) {
try {
targetMethod = typeToResolve.getDeclaredMethod(methodName, methodSignature:_*)
targetMethod = typeToResolve.getDeclaredMethod(methodName, methodSignature: _*)
targetMethod.setAccessible(true)
} catch {
case e: NoSuchMethodException =>
case e: NoSuchMethodException
if (firstException eq null)
firstException = e
@ -994,7 +1008,7 @@ class RemoteServerHandler(
typeToResolve = typeToResolve.getSuperclass
}
if((targetMethod eq null) && (firstException ne null))
if ((targetMethod eq null) && (firstException ne null))
throw firstException
targetMethod
@ -1007,7 +1021,7 @@ class RemoteServerHandler(
if (request.getOneWay) messageReceiver.invoke(typedActor, args: _*) //FIXME execute in non-IO thread
else {
//Sends the response
def sendResponse(result: Either[Throwable,Any]): Unit = try {
def sendResponse(result: Either[Throwable, Any]): Unit = try {
val messageBuilder = RemoteActorSerialization.createRemoteMessageProtocolBuilder(
None,
Right(request.getUuid),
@ -1023,41 +1037,42 @@ class RemoteServerHandler(
write(channel, RemoteEncoder.encode(messageBuilder.build))
} catch {
case e: Exception =>
case e: Exception
EventHandler.error(e, this, e.getMessage)
server.notifyListeners(RemoteServerError(e, server))
}
messageReceiver.invoke(typedActor, args: _*) match { //TODO execute in non-IO thread
//If it's a future, we can lift on that to defer the send to when the future is completed
case f: Future[_] => f.onComplete( future => sendResponse(future.value.get) )
case other => sendResponse(Right(other))
case f: Future[_] f.onComplete(future sendResponse(future.value.get))
case other sendResponse(Right(other))
}
}
} catch {
case e: Exception =>
case e: Exception
EventHandler.error(e, this, e.getMessage)
write(channel, createErrorReplyMessage(e match {
case e: InvocationTargetException => e.getCause
case e => e
case e: InvocationTargetException e.getCause
case e e
}, request, AkkaActorType.TypedActor))
server.notifyListeners(RemoteServerError(e, server))
} finally {
}
finally {
//TODO SenderContextInfo.senderActorRef.value = None ?
//TODO SenderContextInfo.senderProxy.value = None ?
}
}
private def findSessionActor(id: String, channel: Channel) : ActorRef =
private def findSessionActor(id: String, channel: Channel): ActorRef =
sessionActors.get(channel) match {
case null => null
case map => map get id
case null null
case map map get id
}
private def findTypedSessionActor(id: String, channel: Channel) : AnyRef =
private def findTypedSessionActor(id: String, channel: Channel): AnyRef =
typedSessionActors.get(channel) match {
case null => null
case map => map get id
case null null
case map map get id
}
/**
@ -1068,16 +1083,16 @@ class RemoteServerHandler(
val address = actorInfo.getAddress
findSessionActor(address, channel) match {
case null => // we dont have it in the session either, see if we have a factory for it
case null // we dont have it in the session either, see if we have a factory for it
server.findActorFactory(address) match {
case null => null
case factory =>
case null null
case factory
val actorRef = factory()
actorRef.uuid = parseUuid(uuid) //FIXME is this sensible?
sessionActors.get(channel).put(address, actorRef)
actorRef.start() //Start it where's it's created
}
case sessionActor => sessionActor
case sessionActor sessionActor
}
}
@ -1093,36 +1108,36 @@ class RemoteServerHandler(
val address = actorInfo.getAddress
server.findActorByAddressOrUuid(address, parseUuid(uuid).toString) match {
// the actor has not been registered globally. See if we have it in the session
case null => createSessionActor(actorInfo, channel)
case actorRef => actorRef
// the actor has not been registered globally. See if we have it in the session
case null createSessionActor(actorInfo, channel)
case actorRef actorRef
}
}
/**
* gets the actor from the session, or creates one if there is a factory for it
*/
private def createTypedSessionActor(actorInfo: ActorInfoProtocol, channel: Channel):AnyRef ={
private def createTypedSessionActor(actorInfo: ActorInfoProtocol, channel: Channel): AnyRef = {
val address = actorInfo.getAddress
findTypedSessionActor(address, channel) match {
case null =>
case null
server.findTypedActorFactory(address) match {
case null => null
case factory =>
case null null
case factory
val newInstance = factory()
typedSessionActors.get(channel).put(address, newInstance)
newInstance
}
case sessionActor => sessionActor
case sessionActor sessionActor
}
}
private def createTypedActor(actorInfo: ActorInfoProtocol, channel: Channel): AnyRef = {
val uuid = actorInfo.getUuid
server.findTypedActorByAddressOrUuid(actorInfo.getAddress, parseUuid(uuid).toString) match {
// the actor has not been registered globally. See if we have it in the session
case null => createTypedSessionActor(actorInfo, channel)
case typedActor => typedActor
// the actor has not been registered globally. See if we have it in the session
case null createTypedSessionActor(actorInfo, channel)
case typedActor typedActor
}
}
@ -1146,8 +1161,8 @@ class RemoteServerHandler(
private def authenticateRemoteClient(request: RemoteMessageProtocol, ctx: ChannelHandlerContext) = {
val attachment = ctx.getAttachment
if ((attachment ne null) &&
attachment.isInstanceOf[String] &&
attachment.asInstanceOf[String] == CHANNEL_INIT) { // is first time around, channel initialization
attachment.isInstanceOf[String] &&
attachment.asInstanceOf[String] == CHANNEL_INIT) { // is first time around, channel initialization
ctx.setAttachment(null)
val clientAddress = ctx.getChannel.getRemoteAddress.toString
if (!request.hasCookie) throw new SecurityException(
@ -1157,12 +1172,12 @@ class RemoteServerHandler(
}
}
protected def parseUuid(protocol: UuidProtocol): Uuid = uuidFrom(protocol.getHigh,protocol.getLow)
protected def parseUuid(protocol: UuidProtocol): Uuid = uuidFrom(protocol.getHigh, protocol.getLow)
}
class DefaultDisposableChannelGroup(name: String) extends DefaultChannelGroup(name) {
protected val guard = new ReadWriteGuard
protected val open = new AtomicBoolean(true)
protected val open = new AtomicBoolean(true)
override def add(channel: Channel): Boolean = guard withReadGuard {
if (open.get) {