Putting the Netty-stuff in akka.remote.netty and disposing of RemoteClient and RemoteServer

This commit is contained in:
Viktor Klang 2011-01-03 12:42:30 +01:00
parent 8e522f4a03
commit a61e591b2a
13 changed files with 144 additions and 130 deletions

View file

@ -3,8 +3,14 @@
*/ */
package akka.util package akka.util
import java.net.InetSocketAddress
object Address { object Address {
def apply(hostname: String, port: Int) = new Address(hostname, port) def apply(hostname: String, port: Int) = new Address(hostname, port)
def apply(inetAddress: InetSocketAddress): Address = inetAddress match {
case null => null
case inet => new Address(inet.getHostName, inet.getPort)
}
} }
class Address(val hostname: String, val port: Int) { class Address(val hostname: String, val port: Int) {

View file

@ -33,7 +33,7 @@ object ReflectiveAccess extends Logging {
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a> * @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/ */
object Remote { object Remote {
val TRANSPORT = Config.config.getString("akka.remote.layer","akka.remote.NettyRemoteSupport") val TRANSPORT = Config.config.getString("akka.remote.layer","akka.remote.netty.NettyRemoteSupport")
private[akka] val configDefaultAddress = private[akka] val configDefaultAddress =
new InetSocketAddress(Config.config.getString("akka.remote.server.hostname", "localhost"), new InetSocketAddress(Config.config.getString("akka.remote.server.hostname", "localhost"),

View file

@ -23,7 +23,7 @@ trait BootableRemoteActorService extends Bootable with Logging {
def startRemoteService = remoteServerThread.start def startRemoteService = remoteServerThread.start
abstract override def onLoad = { abstract override def onLoad = {
if (RemoteServer.isRemotingEnabled) { if (ReflectiveAccess.isRemotingEnabled && RemoteServerSettings.isRemotingEnabled) {
log.slf4j.info("Initializing Remote Actors Service...") log.slf4j.info("Initializing Remote Actors Service...")
startRemoteService startRemoteService
log.slf4j.info("Remote Actors Service initialized") log.slf4j.info("Remote Actors Service initialized")

View file

@ -0,0 +1,66 @@
/**
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
*/
package akka.remote
import akka.util.Duration
import akka.config.Config._
import akka.config.ConfigurationException
object RemoteClientSettings {
val SECURE_COOKIE: Option[String] = config.getString("akka.remote.secure-cookie", "") match {
case "" => None
case cookie => Some(cookie)
}
val RECONNECTION_TIME_WINDOW = Duration(config.getInt("akka.remote.client.reconnection-time-window", 600), TIME_UNIT).toMillis
val READ_TIMEOUT = Duration(config.getInt("akka.remote.client.read-timeout", 1), TIME_UNIT)
val RECONNECT_DELAY = Duration(config.getInt("akka.remote.client.reconnect-delay", 5), TIME_UNIT)
val MESSAGE_FRAME_SIZE = config.getInt("akka.remote.client.message-frame-size", 1048576)
}
object RemoteServerSettings {
val isRemotingEnabled = config.getList("akka.enabled-modules").exists(_ == "remote")
val MESSAGE_FRAME_SIZE = config.getInt("akka.remote.server.message-frame-size", 1048576)
val SECURE_COOKIE = config.getString("akka.remote.secure-cookie")
val REQUIRE_COOKIE = {
val requireCookie = config.getBool("akka.remote.server.require-cookie", true)
if (isRemotingEnabled && requireCookie && SECURE_COOKIE.isEmpty) throw new ConfigurationException(
"Configuration option 'akka.remote.server.require-cookie' is turned on but no secure cookie is defined in 'akka.remote.secure-cookie'.")
requireCookie
}
val UNTRUSTED_MODE = config.getBool("akka.remote.server.untrusted-mode", false)
val HOSTNAME = config.getString("akka.remote.server.hostname", "localhost")
val PORT = config.getInt("akka.remote.server.port", 2552)
val CONNECTION_TIMEOUT_MILLIS = Duration(config.getInt("akka.remote.server.connection-timeout", 1), TIME_UNIT)
val COMPRESSION_SCHEME = config.getString("akka.remote.compression-scheme", "zlib")
val ZLIB_COMPRESSION_LEVEL = {
val level = config.getInt("akka.remote.zlib-compression-level", 6)
if (level < 1 && level > 9) throw new IllegalArgumentException(
"zlib compression level has to be within 1-9, with 1 being fastest and 9 being the most compressed")
level
}
val SECURE = {
/*if (config.getBool("akka.remote.ssl.service",false)) {
val properties = List(
("key-store-type" , "keyStoreType"),
("key-store" , "keyStore"),
("key-store-pass" , "keyStorePassword"),
("trust-store-type", "trustStoreType"),
("trust-store" , "trustStore"),
("trust-store-pass", "trustStorePassword")
).map(x => ("akka.remote.ssl." + x._1, "javax.net.ssl." + x._2))
// If property is not set, and we have a value from our akka.conf, use that value
for {
p <- properties if System.getProperty(p._2) eq null
c <- config.getString(p._1)
} System.setProperty(p._2, c)
if (config.getBool("akka.remote.ssl.debug", false)) System.setProperty("javax.net.debug","ssl")
true
} else */false
}
}

View file

@ -2,20 +2,23 @@
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se> * Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
*/ */
package akka.remote package akka.remote.netty
import akka.remote.protocol.RemoteProtocol.{ActorType => ActorTypeProtocol, _}
import akka.dispatch.{DefaultCompletableFuture, CompletableFuture, Future} import akka.dispatch.{DefaultCompletableFuture, CompletableFuture, Future}
import akka.remote.protocol.RemoteProtocol._ import akka.remote.protocol.RemoteProtocol._
import akka.remote.protocol.RemoteProtocol.ActorType._ import akka.remote.protocol.RemoteProtocol.ActorType._
import akka.config.ConfigurationException import akka.config.ConfigurationException
import akka.serialization.RemoteActorSerialization import akka.serialization.RemoteActorSerialization
import akka.serialization.RemoteActorSerialization._
import akka.japi.Creator import akka.japi.Creator
import akka.config.Config._ import akka.config.Config._
import akka.serialization.RemoteActorSerialization._ import akka.remoteinterface._
import akka.actor. {Index, ActorInitializationException, LocalActorRef, newUuid, ActorRegistry, Actor, RemoteActorRef, TypedActor, ActorRef, IllegalActorStateException, RemoteActorSystemMessage, uuidFrom, Uuid, Exit, LifeCycleMessage, ActorType => AkkaActorType}
import akka.AkkaException import akka.AkkaException
import akka.actor.Actor._ import akka.actor.Actor._
import akka.util._ import akka.util._
import akka.remote.MessageSerializer
import akka.remote.{RemoteClientSettings, RemoteServerSettings}
import org.jboss.netty.channel._ import org.jboss.netty.channel._
import org.jboss.netty.channel.group.{DefaultChannelGroup,ChannelGroup} import org.jboss.netty.channel.group.{DefaultChannelGroup,ChannelGroup}
@ -31,17 +34,11 @@ import org.jboss.netty.handler.ssl.SslHandler
import java.net.{ SocketAddress, InetSocketAddress } import java.net.{ SocketAddress, InetSocketAddress }
import java.util.concurrent.{ TimeUnit, Executors, ConcurrentMap, ConcurrentHashMap, ConcurrentSkipListSet } import java.util.concurrent.{ TimeUnit, Executors, ConcurrentMap, ConcurrentHashMap, ConcurrentSkipListSet }
import scala.collection.mutable.{ HashSet, HashMap } import scala.collection.mutable.{ HashMap }
import scala.reflect.BeanProperty import scala.reflect.BeanProperty
import java.lang.reflect.InvocationTargetException import java.lang.reflect.InvocationTargetException
import java.util.concurrent.atomic. {AtomicReference, AtomicLong, AtomicBoolean} import java.util.concurrent.atomic. {AtomicReference, AtomicLong, AtomicBoolean}
import akka.remoteinterface._
import akka.actor. {Index, ActorInitializationException, LocalActorRef, newUuid, ActorRegistry, Actor, RemoteActorRef, TypedActor, ActorRef, IllegalActorStateException, RemoteActorSystemMessage, uuidFrom, Uuid, Exit, LifeCycleMessage, ActorType => AkkaActorType}
/**
* The RemoteClient object manages RemoteClient instances and gives you an API to lookup remote actor handles.
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
trait NettyRemoteClientModule extends RemoteClientModule { self: ListenerManagement with Logging => trait NettyRemoteClientModule extends RemoteClientModule { self: ListenerManagement with Logging =>
private val remoteClients = new HashMap[Address, RemoteClient] private val remoteClients = new HashMap[Address, RemoteClient]
private val remoteActors = new Index[Address, Uuid] private val remoteActors = new Index[Address, Uuid]
@ -60,16 +57,16 @@ trait NettyRemoteClientModule extends RemoteClientModule { self: ListenerManagem
typedActorInfo: Option[Tuple2[String, String]], typedActorInfo: Option[Tuple2[String, String]],
actorType: AkkaActorType, actorType: AkkaActorType,
loader: Option[ClassLoader]): Option[CompletableFuture[T]] = loader: Option[ClassLoader]): Option[CompletableFuture[T]] =
clientFor(remoteAddress, loader).send[T](message, senderOption, senderFuture, remoteAddress, timeout, isOneWay, actorRef, typedActorInfo, actorType) withClientFor(remoteAddress, loader)(_.send[T](message, senderOption, senderFuture, remoteAddress, timeout, isOneWay, actorRef, typedActorInfo, actorType))
private[akka] def clientFor( private[akka] def withClientFor[T](
address: InetSocketAddress, loader: Option[ClassLoader]): RemoteClient = { address: InetSocketAddress, loader: Option[ClassLoader])(fun: RemoteClient => T): T = {
loader.foreach(MessageSerializer.setClassLoader(_))//TODO: REVISIT: THIS SMELLS FUNNY loader.foreach(MessageSerializer.setClassLoader(_))//TODO: REVISIT: THIS SMELLS FUNNY
val key = makeKey(address) val key = Address(address)
lock.readLock.lock lock.readLock.lock
try { try {
remoteClients.get(key) match { val c = remoteClients.get(key) match {
case Some(client) => client case Some(client) => client
case None => case None =>
lock.readLock.unlock lock.readLock.unlock
@ -87,33 +84,29 @@ trait NettyRemoteClientModule extends RemoteClientModule { self: ListenerManagem
} finally { lock.readLock.lock } //downgrade } finally { lock.readLock.lock } //downgrade
} finally { lock.writeLock.unlock } } finally { lock.writeLock.unlock }
} }
fun(c)
} finally { lock.readLock.unlock } } finally { lock.readLock.unlock }
} }
private def makeKey(a: InetSocketAddress): Address = a match {
case null => null
case address => Address(address.getHostName,address.getPort)
}
def shutdownClientConnection(address: InetSocketAddress): Boolean = lock withWriteGuard { def shutdownClientConnection(address: InetSocketAddress): Boolean = lock withWriteGuard {
remoteClients.remove(makeKey(address)) match { remoteClients.remove(Address(address)) match {
case Some(client) => client.shutdown case Some(client) => client.shutdown
case None => false case None => false
} }
} }
def restartClientConnection(address: InetSocketAddress): Boolean = lock withReadGuard { def restartClientConnection(address: InetSocketAddress): Boolean = lock withReadGuard {
remoteClients.get(makeKey(address)) match { remoteClients.get(Address(address)) match {
case Some(client) => client.connect(reconnectIfAlreadyConnected = true) case Some(client) => client.connect(reconnectIfAlreadyConnected = true)
case None => false case None => false
} }
} }
private[akka] def registerSupervisorForActor(actorRef: ActorRef): ActorRef = private[akka] def registerSupervisorForActor(actorRef: ActorRef): ActorRef =
clientFor(actorRef.homeAddress.get, None).registerSupervisorForActor(actorRef) withClientFor(actorRef.homeAddress.get, None)(_.registerSupervisorForActor(actorRef))
private[akka] def deregisterSupervisorForActor(actorRef: ActorRef): ActorRef = lock withReadGuard { private[akka] def deregisterSupervisorForActor(actorRef: ActorRef): ActorRef = lock withReadGuard {
remoteClients.get(makeKey(actorRef.homeAddress.get)) match { remoteClients.get(Address(actorRef.homeAddress.get)) match {
case Some(client) => client.deregisterSupervisorForActor(actorRef) case Some(client) => client.deregisterSupervisorForActor(actorRef)
case None => actorRef case None => actorRef
} }
@ -143,17 +136,11 @@ trait NettyRemoteClientModule extends RemoteClientModule { self: ListenerManagem
} }
} }
object RemoteClient { /**
val SECURE_COOKIE: Option[String] = config.getString("akka.remote.secure-cookie", "") match { * This is the abstract baseclass for netty remote clients,
case "" => None * currently there's only an ActiveRemoteClient, but otehrs could be feasible, like a PassiveRemoteClient that
case cookie => Some(cookie) * reuses an already established connection.
} */
val RECONNECTION_TIME_WINDOW = Duration(config.getInt("akka.remote.client.reconnection-time-window", 600), TIME_UNIT).toMillis
val READ_TIMEOUT = Duration(config.getInt("akka.remote.client.read-timeout", 1), TIME_UNIT)
val RECONNECT_DELAY = Duration(config.getInt("akka.remote.client.reconnect-delay", 5), TIME_UNIT)
val MESSAGE_FRAME_SIZE = config.getInt("akka.remote.client.message-frame-size", 1048576)
}
abstract class RemoteClient private[akka] ( abstract class RemoteClient private[akka] (
val module: NettyRemoteClientModule, val module: NettyRemoteClientModule,
val remoteAddress: InetSocketAddress) extends Logging { val remoteAddress: InetSocketAddress) extends Logging {
@ -165,14 +152,27 @@ abstract class RemoteClient private[akka] (
private[remote] val runSwitch = new Switch() private[remote] val runSwitch = new Switch()
private[remote] val isAuthenticated = new AtomicBoolean(false) private[remote] val isAuthenticated = new AtomicBoolean(false)
/**
* Is this client currently running?
*/
private[remote] def isRunning = runSwitch.isOn private[remote] def isRunning = runSwitch.isOn
protected def notifyListeners(msg: => Any); Unit protected def notifyListeners(msg: => Any); Unit
protected def currentChannel: Channel protected def currentChannel: Channel
/**
* Pretty self explanatory?
*/
def connect(reconnectIfAlreadyConnected: Boolean = false): Boolean def connect(reconnectIfAlreadyConnected: Boolean = false): Boolean
/**
* Shuts this client down and releases any resources attached
*/
def shutdown: Boolean def shutdown: Boolean
/**
* Converts the message to the wireprotocol and sends the message across the wire
*/
def send[T]( def send[T](
message: Any, message: Any,
senderOption: Option[ActorRef], senderOption: Option[ActorRef],
@ -194,10 +194,13 @@ abstract class RemoteClient private[akka] (
senderOption, senderOption,
typedActorInfo, typedActorInfo,
actorType, actorType,
if (isAuthenticated.compareAndSet(false, true)) RemoteClient.SECURE_COOKIE else None if (isAuthenticated.compareAndSet(false, true)) RemoteClientSettings.SECURE_COOKIE else None
).build, senderFuture) ).build, senderFuture)
} }
/**
* Sends the message across the wire
*/
def send[T]( def send[T](
request: RemoteMessageProtocol, request: RemoteMessageProtocol,
senderFuture: Option[CompletableFuture[T]]): Option[CompletableFuture[T]] = { senderFuture: Option[CompletableFuture[T]]): Option[CompletableFuture[T]] = {
@ -251,7 +254,7 @@ abstract class RemoteClient private[akka] (
} }
/** /**
* RemoteClient represents a connection to a RemoteServer. Is used to send messages to remote actors on the RemoteServer. * RemoteClient represents a connection to an Akka node. Is used to send messages to remote actors on the node.
* *
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a> * @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/ */
@ -260,7 +263,7 @@ class ActiveRemoteClient private[akka] (
remoteAddress: InetSocketAddress, remoteAddress: InetSocketAddress,
val loader: Option[ClassLoader] = None, val loader: Option[ClassLoader] = None,
notifyListenersFun: (=> Any) => Unit) extends RemoteClient(module, remoteAddress) { notifyListenersFun: (=> Any) => Unit) extends RemoteClient(module, remoteAddress) {
import RemoteClient._ import RemoteClientSettings._
//FIXME rewrite to a wrapper object (minimize volatile access and maximize encapsulation) //FIXME rewrite to a wrapper object (minimize volatile access and maximize encapsulation)
@volatile private var bootstrap: ClientBootstrap = _ @volatile private var bootstrap: ClientBootstrap = _
@volatile private[remote] var connection: ChannelFuture = _ @volatile private[remote] var connection: ChannelFuture = _
@ -366,14 +369,14 @@ class ActiveRemoteClientPipelineFactory(
e e
} }
val ssl = if (RemoteServer.SECURE) join(new SslHandler(engine)) else join() val ssl = if (RemoteServerSettings.SECURE) join(new SslHandler(engine)) else join()
val timeout = new ReadTimeoutHandler(timer, RemoteClient.READ_TIMEOUT.toMillis.toInt) val timeout = new ReadTimeoutHandler(timer, RemoteClientSettings.READ_TIMEOUT.toMillis.toInt)
val lenDec = new LengthFieldBasedFrameDecoder(RemoteClient.MESSAGE_FRAME_SIZE, 0, 4, 0, 4) val lenDec = new LengthFieldBasedFrameDecoder(RemoteClientSettings.MESSAGE_FRAME_SIZE, 0, 4, 0, 4)
val lenPrep = new LengthFieldPrepender(4) val lenPrep = new LengthFieldPrepender(4)
val protobufDec = new ProtobufDecoder(RemoteMessageProtocol.getDefaultInstance) val protobufDec = new ProtobufDecoder(RemoteMessageProtocol.getDefaultInstance)
val protobufEnc = new ProtobufEncoder val protobufEnc = new ProtobufEncoder
val (enc, dec) = RemoteServer.COMPRESSION_SCHEME match { val (enc, dec) = RemoteServerSettings.COMPRESSION_SCHEME match {
case "zlib" => (join(new ZlibEncoder(RemoteServer.ZLIB_COMPRESSION_LEVEL)), join(new ZlibDecoder)) case "zlib" => (join(new ZlibEncoder(RemoteServerSettings.ZLIB_COMPRESSION_LEVEL)), join(new ZlibDecoder))
case _ => (join(), join()) case _ => (join(), join())
} }
@ -452,7 +455,7 @@ class ActiveRemoteClientHandler(
client.openChannels.remove(event.getChannel) client.openChannels.remove(event.getChannel)
client.connect(reconnectIfAlreadyConnected = true) client.connect(reconnectIfAlreadyConnected = true)
} }
}, RemoteClient.RECONNECT_DELAY.toMillis, TimeUnit.MILLISECONDS) }, RemoteClientSettings.RECONNECT_DELAY.toMillis, TimeUnit.MILLISECONDS)
} else spawn { client.shutdown } } else spawn { client.shutdown }
} }
@ -463,7 +466,7 @@ class ActiveRemoteClientHandler(
client.resetReconnectionTimeWindow client.resetReconnectionTimeWindow
} }
if (RemoteServer.SECURE) { if (RemoteServerSettings.SECURE) {
val sslHandler: SslHandler = ctx.getPipeline.get(classOf[SslHandler]) val sslHandler: SslHandler = ctx.getPipeline.get(classOf[SslHandler])
sslHandler.handshake.addListener(new ChannelFutureListener { sslHandler.handshake.addListener(new ChannelFutureListener {
def operationComplete(future: ChannelFuture): Unit = { def operationComplete(future: ChannelFuture): Unit = {
@ -507,58 +510,6 @@ class ActiveRemoteClientHandler(
} }
} }
/**
* For internal use only. Holds configuration variables, remote actors, remote typed actors and remote servers.
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
object RemoteServer {
val isRemotingEnabled = config.getList("akka.enabled-modules").exists(_ == "remote")
val MESSAGE_FRAME_SIZE = config.getInt("akka.remote.server.message-frame-size", 1048576)
val SECURE_COOKIE = config.getString("akka.remote.secure-cookie")
val REQUIRE_COOKIE = {
val requireCookie = config.getBool("akka.remote.server.require-cookie", true)
if (isRemotingEnabled && requireCookie && RemoteServer.SECURE_COOKIE.isEmpty) throw new ConfigurationException(
"Configuration option 'akka.remote.server.require-cookie' is turned on but no secure cookie is defined in 'akka.remote.secure-cookie'.")
requireCookie
}
val UNTRUSTED_MODE = config.getBool("akka.remote.server.untrusted-mode", false)
val HOSTNAME = config.getString("akka.remote.server.hostname", "localhost")
val PORT = config.getInt("akka.remote.server.port", 2552)
val CONNECTION_TIMEOUT_MILLIS = Duration(config.getInt("akka.remote.server.connection-timeout", 1), TIME_UNIT)
val COMPRESSION_SCHEME = config.getString("akka.remote.compression-scheme", "zlib")
val ZLIB_COMPRESSION_LEVEL = {
val level = config.getInt("akka.remote.zlib-compression-level", 6)
if (level < 1 && level > 9) throw new IllegalArgumentException(
"zlib compression level has to be within 1-9, with 1 being fastest and 9 being the most compressed")
level
}
val SECURE = {
/*if (config.getBool("akka.remote.ssl.service",false)) {
val properties = List(
("key-store-type" , "keyStoreType"),
("key-store" , "keyStore"),
("key-store-pass" , "keyStorePassword"),
("trust-store-type", "trustStoreType"),
("trust-store" , "trustStore"),
("trust-store-pass", "trustStorePassword")
).map(x => ("akka.remote.ssl." + x._1, "javax.net.ssl." + x._2))
// If property is not set, and we have a value from our akka.conf, use that value
for {
p <- properties if System.getProperty(p._2) eq null
c <- config.getString(p._1)
} System.setProperty(p._2, c)
if (config.getBool("akka.remote.ssl.debug", false)) System.setProperty("javax.net.debug","ssl")
true
} else */false
}
}
/** /**
* Provides the implementation of the Netty remote support * Provides the implementation of the Netty remote support
*/ */
@ -612,7 +563,7 @@ class NettyRemoteServer(serverModule: NettyRemoteServerModule, val host: String,
bootstrap.setOption("child.tcpNoDelay", true) bootstrap.setOption("child.tcpNoDelay", true)
bootstrap.setOption("child.keepAlive", true) bootstrap.setOption("child.keepAlive", true)
bootstrap.setOption("child.reuseAddress", true) bootstrap.setOption("child.reuseAddress", true)
bootstrap.setOption("child.connectTimeoutMillis", RemoteServer.CONNECTION_TIMEOUT_MILLIS.toMillis) bootstrap.setOption("child.connectTimeoutMillis", RemoteServerSettings.CONNECTION_TIMEOUT_MILLIS.toMillis)
openChannels.add(bootstrap.bind(address)) openChannels.add(bootstrap.bind(address))
serverModule.notifyListeners(RemoteServerStarted(serverModule)) serverModule.notifyListeners(RemoteServerStarted(serverModule))
@ -631,7 +582,7 @@ class NettyRemoteServer(serverModule: NettyRemoteServerModule, val host: String,
} }
trait NettyRemoteServerModule extends RemoteServerModule { self: RemoteModule => trait NettyRemoteServerModule extends RemoteServerModule { self: RemoteModule =>
import RemoteServer._ import RemoteServerSettings._
private[akka] val currentServer = new AtomicReference[Option[NettyRemoteServer]](None) private[akka] val currentServer = new AtomicReference[Option[NettyRemoteServer]](None)
def address = currentServer.get match { def address = currentServer.get match {
@ -828,7 +779,7 @@ class RemoteServerPipelineFactory(
val openChannels: ChannelGroup, val openChannels: ChannelGroup,
val loader: Option[ClassLoader], val loader: Option[ClassLoader],
val server: NettyRemoteServerModule) extends ChannelPipelineFactory { val server: NettyRemoteServerModule) extends ChannelPipelineFactory {
import RemoteServer._ import RemoteServerSettings._
def getPipeline: ChannelPipeline = { def getPipeline: ChannelPipeline = {
def join(ch: ChannelHandler*) = Array[ChannelHandler](ch:_*) def join(ch: ChannelHandler*) = Array[ChannelHandler](ch:_*)
@ -840,13 +791,13 @@ class RemoteServerPipelineFactory(
e e
} }
val ssl = if(RemoteServer.SECURE) join(new SslHandler(engine)) else join() val ssl = if(SECURE) join(new SslHandler(engine)) else join()
val lenDec = new LengthFieldBasedFrameDecoder(RemoteServer.MESSAGE_FRAME_SIZE, 0, 4, 0, 4) val lenDec = new LengthFieldBasedFrameDecoder(MESSAGE_FRAME_SIZE, 0, 4, 0, 4)
val lenPrep = new LengthFieldPrepender(4) val lenPrep = new LengthFieldPrepender(4)
val protobufDec = new ProtobufDecoder(RemoteMessageProtocol.getDefaultInstance) val protobufDec = new ProtobufDecoder(RemoteMessageProtocol.getDefaultInstance)
val protobufEnc = new ProtobufEncoder val protobufEnc = new ProtobufEncoder
val (enc, dec) = RemoteServer.COMPRESSION_SCHEME match { val (enc, dec) = COMPRESSION_SCHEME match {
case "zlib" => (join(new ZlibEncoder(RemoteServer.ZLIB_COMPRESSION_LEVEL)), join(new ZlibDecoder)) case "zlib" => (join(new ZlibEncoder(ZLIB_COMPRESSION_LEVEL)), join(new ZlibDecoder))
case _ => (join(), join()) case _ => (join(), join())
} }
@ -865,7 +816,7 @@ class RemoteServerHandler(
val openChannels: ChannelGroup, val openChannels: ChannelGroup,
val applicationLoader: Option[ClassLoader], val applicationLoader: Option[ClassLoader],
val server: NettyRemoteServerModule) extends SimpleChannelUpstreamHandler with Logging { val server: NettyRemoteServerModule) extends SimpleChannelUpstreamHandler with Logging {
import RemoteServer._ import RemoteServerSettings._
val AW_PROXY_PREFIX = "$$ProxiedByAW".intern val AW_PROXY_PREFIX = "$$ProxiedByAW".intern
val CHANNEL_INIT = "channel-init".intern val CHANNEL_INIT = "channel-init".intern
@ -876,7 +827,7 @@ class RemoteServerHandler(
applicationLoader.foreach(MessageSerializer.setClassLoader(_)) //TODO: REVISIT: THIS FEELS A BIT DODGY applicationLoader.foreach(MessageSerializer.setClassLoader(_)) //TODO: REVISIT: THIS FEELS A BIT DODGY
/** /**
* ChannelOpen overridden to store open channels for a clean postStop of a RemoteServer. * ChannelOpen overridden to store open channels for a clean postStop of a node.
* If a channel is closed before, it is automatically removed from the open channels group. * If a channel is closed before, it is automatically removed from the open channels group.
*/ */
override def channelOpen(ctx: ChannelHandlerContext, event: ChannelStateEvent) = openChannels.add(ctx.getChannel) override def channelOpen(ctx: ChannelHandlerContext, event: ChannelStateEvent) = openChannels.add(ctx.getChannel)
@ -886,7 +837,7 @@ class RemoteServerHandler(
sessionActors.set(event.getChannel(), new ConcurrentHashMap[String, ActorRef]()) sessionActors.set(event.getChannel(), new ConcurrentHashMap[String, ActorRef]())
typedSessionActors.set(event.getChannel(), new ConcurrentHashMap[String, AnyRef]()) typedSessionActors.set(event.getChannel(), new ConcurrentHashMap[String, AnyRef]())
log.slf4j.debug("Remote client [{}] connected to [{}]", clientAddress, server.name) log.slf4j.debug("Remote client [{}] connected to [{}]", clientAddress, server.name)
if (RemoteServer.SECURE) { if (SECURE) {
val sslHandler: SslHandler = ctx.getPipeline.get(classOf[SslHandler]) val sslHandler: SslHandler = ctx.getPipeline.get(classOf[SslHandler])
// Begin handshake. // Begin handshake.
sslHandler.handshake().addListener(new ChannelFutureListener { sslHandler.handshake().addListener(new ChannelFutureListener {
@ -900,7 +851,7 @@ class RemoteServerHandler(
} else { } else {
server.notifyListeners(RemoteServerClientConnected(server, clientAddress)) server.notifyListeners(RemoteServerClientConnected(server, clientAddress))
} }
if (RemoteServer.REQUIRE_COOKIE) ctx.setAttachment(CHANNEL_INIT) // signal that this is channel initialization, which will need authentication if (REQUIRE_COOKIE) ctx.setAttachment(CHANNEL_INIT) // signal that this is channel initialization, which will need authentication
} }
override def channelDisconnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = { override def channelDisconnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = {
@ -942,7 +893,7 @@ class RemoteServerHandler(
override def messageReceived(ctx: ChannelHandlerContext, event: MessageEvent) = event.getMessage match { override def messageReceived(ctx: ChannelHandlerContext, event: MessageEvent) = event.getMessage match {
case null => throw new IllegalActorStateException("Message in remote MessageEvent is null: " + event) case null => throw new IllegalActorStateException("Message in remote MessageEvent is null: " + event)
case requestProtocol: RemoteMessageProtocol => case requestProtocol: RemoteMessageProtocol =>
if (RemoteServer.REQUIRE_COOKIE) authenticateRemoteClient(requestProtocol, ctx) if (REQUIRE_COOKIE) authenticateRemoteClient(requestProtocol, ctx)
handleRemoteMessageProtocol(requestProtocol, event.getChannel) handleRemoteMessageProtocol(requestProtocol, event.getChannel)
case _ => //ignore case _ => //ignore
} }
@ -990,9 +941,9 @@ class RemoteServerHandler(
message match { // first match on system messages message match { // first match on system messages
case RemoteActorSystemMessage.Stop => case RemoteActorSystemMessage.Stop =>
if (RemoteServer.UNTRUSTED_MODE) throw new SecurityException("Remote server is operating is untrusted mode, can not stop the actor") if (UNTRUSTED_MODE) throw new SecurityException("Remote server is operating is untrusted mode, can not stop the actor")
else actorRef.stop else actorRef.stop
case _: LifeCycleMessage if (RemoteServer.UNTRUSTED_MODE) => case _: LifeCycleMessage if (UNTRUSTED_MODE) =>
throw new SecurityException("Remote server is operating is untrusted mode, can not pass on a LifeCycleMessage to the remote actor") throw new SecurityException("Remote server is operating is untrusted mode, can not pass on a LifeCycleMessage to the remote actor")
case _ => // then match on user defined messages case _ => // then match on user defined messages
@ -1143,7 +1094,7 @@ class RemoteServerHandler(
val name = actorInfo.getTarget val name = actorInfo.getTarget
try { try {
if (RemoteServer.UNTRUSTED_MODE) throw new SecurityException( if (UNTRUSTED_MODE) throw new SecurityException(
"Remote server is operating is untrusted mode, can not create remote actors on behalf of the remote client") "Remote server is operating is untrusted mode, can not create remote actors on behalf of the remote client")
log.slf4j.info("Creating a new client-managed remote actor [{}:{}]", name, uuid) log.slf4j.info("Creating a new client-managed remote actor [{}:{}]", name, uuid)
@ -1216,7 +1167,7 @@ class RemoteServerHandler(
val uuid = actorInfo.getUuid val uuid = actorInfo.getUuid
try { try {
if (RemoteServer.UNTRUSTED_MODE) throw new SecurityException( if (UNTRUSTED_MODE) throw new SecurityException(
"Remote server is operating is untrusted mode, can not create remote actors on behalf of the remote client") "Remote server is operating is untrusted mode, can not create remote actors on behalf of the remote client")
log.slf4j.info("Creating a new remote typed actor:\n\t[{} :: {}]", interfaceClassname, targetClassname) log.slf4j.info("Creating a new remote typed actor:\n\t[{} :: {}]", interfaceClassname, targetClassname)
@ -1285,7 +1236,7 @@ class RemoteServerHandler(
val clientAddress = ctx.getChannel.getRemoteAddress.toString val clientAddress = ctx.getChannel.getRemoteAddress.toString
if (!request.hasCookie) throw new SecurityException( if (!request.hasCookie) throw new SecurityException(
"The remote client [" + clientAddress + "] does not have a secure cookie.") "The remote client [" + clientAddress + "] does not have a secure cookie.")
if (!(request.getCookie == RemoteServer.SECURE_COOKIE.get)) throw new SecurityException( if (!(request.getCookie == SECURE_COOKIE.get)) throw new SecurityException(
"The remote client [" + clientAddress + "] secure cookie is not the same as remote server secure cookie") "The remote client [" + clientAddress + "] secure cookie is not the same as remote server secure cookie")
log.slf4j.info("Remote client [{}] successfully authenticated using secure cookie", clientAddress) log.slf4j.info("Remote client [{}] successfully authenticated using secure cookie", clientAddress)
} }

View file

@ -5,7 +5,6 @@
package akka.serialization package akka.serialization
import akka.dispatch.MessageInvocation import akka.dispatch.MessageInvocation
import akka.remote.{RemoteServer, RemoteClient, MessageSerializer}
import akka.remote.protocol.RemoteProtocol.{ActorType => ActorTypeProtocol, _} import akka.remote.protocol.RemoteProtocol.{ActorType => ActorTypeProtocol, _}
import ActorTypeProtocol._ import ActorTypeProtocol._
@ -18,6 +17,7 @@ import scala.collection.immutable.Stack
import com.google.protobuf.ByteString import com.google.protobuf.ByteString
import akka.util.ReflectiveAccess import akka.util.ReflectiveAccess
import java.net.InetSocketAddress import java.net.InetSocketAddress
import akka.remote. {RemoteClientSettings, MessageSerializer}
/** /**
* Type class definition for Actor Serialization * Type class definition for Actor Serialization
@ -140,7 +140,7 @@ object ActorSerialization {
actorRef.getSender, actorRef.getSender,
None, None,
ActorType.ScalaActor, ActorType.ScalaActor,
RemoteClient.SECURE_COOKIE).build) RemoteClientSettings.SECURE_COOKIE).build)
requestProtocols.foreach(rp => builder.addMessages(rp)) requestProtocols.foreach(rp => builder.addMessages(rp))
} }

View file

@ -5,7 +5,7 @@ import org.scalatest.matchers.MustMatchers
import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach} import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
import org.scalatest.junit.JUnitRunner import org.scalatest.junit.JUnitRunner
import org.junit.runner.RunWith import org.junit.runner.RunWith
import akka.remote.NettyRemoteSupport import akka.remote.netty.NettyRemoteSupport
import akka.actor. {Actor, ActorRegistry} import akka.actor. {Actor, ActorRegistry}
import java.util.concurrent. {TimeUnit, CountDownLatch} import java.util.concurrent. {TimeUnit, CountDownLatch}

View file

@ -8,7 +8,6 @@ import org.scalatest.junit.JUnitRunner
import org.junit.runner.RunWith import org.junit.runner.RunWith
import akka.dispatch.Dispatchers import akka.dispatch.Dispatchers
import akka.remote. {NettyRemoteSupport, RemoteServer, RemoteClient}
import akka.actor.Actor._ import akka.actor.Actor._
import akka.actor._ import akka.actor._

View file

@ -7,7 +7,6 @@ package akka.actor.remote
import java.util.concurrent.{LinkedBlockingQueue, TimeUnit, BlockingQueue} import java.util.concurrent.{LinkedBlockingQueue, TimeUnit, BlockingQueue}
import akka.serialization.BinaryString import akka.serialization.BinaryString
import akka.config.Supervision._ import akka.config.Supervision._
import akka.remote.{RemoteServer, RemoteClient}
import akka.OneWay import akka.OneWay
import org.scalatest._ import org.scalatest._
import org.scalatest.WordSpec import org.scalatest.WordSpec

View file

@ -4,7 +4,6 @@ import java.util.concurrent.{CountDownLatch, TimeUnit}
import akka.actor.Actor._ import akka.actor.Actor._
import akka.actor.{ActorRegistry, ActorRef, Actor} import akka.actor.{ActorRegistry, ActorRef, Actor}
import akka.remote. {NettyRemoteSupport}
object ServerInitiatedRemoteActorSpec { object ServerInitiatedRemoteActorSpec {
case class Send(actor: ActorRef) case class Send(actor: ActorRef)

View file

@ -6,8 +6,8 @@ package akka.actor.remote
import akka.actor._ import akka.actor._
import akka.actor.Actor._ import akka.actor.Actor._
import akka.remote.NettyRemoteSupport
import java.util.concurrent. {ConcurrentSkipListSet, TimeUnit} import java.util.concurrent. {ConcurrentSkipListSet, TimeUnit}
import akka.remote.netty.NettyRemoteSupport
object ServerInitiatedRemoteSessionActorSpec { object ServerInitiatedRemoteSessionActorSpec {

View file

@ -6,7 +6,6 @@ package akka.actor.remote
import java.util.concurrent.TimeUnit import java.util.concurrent.TimeUnit
import akka.remote.{RemoteServer, RemoteClient}
import akka.actor._ import akka.actor._
import RemoteTypedActorLog._ import RemoteTypedActorLog._

View file

@ -1,13 +1,8 @@
package akka.actor.serialization package akka.actor.serialization
import java.util.concurrent.TimeUnit
import org.scalatest._
import org.scalatest.WordSpec
import org.scalatest.matchers.MustMatchers
import akka.actor.{ProtobufProtocol, Actor} import akka.actor.{ProtobufProtocol, Actor}
import ProtobufProtocol.ProtobufPOJO import ProtobufProtocol.ProtobufPOJO
import Actor._ import Actor._
import akka.remote.NettyRemoteSupport
import akka.actor.remote.AkkaRemoteTest import akka.actor.remote.AkkaRemoteTest
/* --------------------------- /* ---------------------------