diff --git a/akka-actor/src/main/scala/akka/util/Address.scala b/akka-actor/src/main/scala/akka/util/Address.scala index 66af8aec6a..4e1749c560 100644 --- a/akka-actor/src/main/scala/akka/util/Address.scala +++ b/akka-actor/src/main/scala/akka/util/Address.scala @@ -3,8 +3,14 @@ */ package akka.util +import java.net.InetSocketAddress + object Address { def apply(hostname: String, port: Int) = new Address(hostname, port) + def apply(inetAddress: InetSocketAddress): Address = inetAddress match { + case null => null + case inet => new Address(inet.getHostName, inet.getPort) + } } class Address(val hostname: String, val port: Int) { diff --git a/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala b/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala index 606044a66e..cc657bca3b 100644 --- a/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala +++ b/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala @@ -33,7 +33,7 @@ object ReflectiveAccess extends Logging { * @author Jonas Bonér */ object Remote { - val TRANSPORT = Config.config.getString("akka.remote.layer","akka.remote.NettyRemoteSupport") + val TRANSPORT = Config.config.getString("akka.remote.layer","akka.remote.netty.NettyRemoteSupport") private[akka] val configDefaultAddress = new InetSocketAddress(Config.config.getString("akka.remote.server.hostname", "localhost"), diff --git a/akka-remote/src/main/scala/akka/remote/BootableRemoteActorService.scala b/akka-remote/src/main/scala/akka/remote/BootableRemoteActorService.scala index 7951dd25f9..c0aa2218dd 100644 --- a/akka-remote/src/main/scala/akka/remote/BootableRemoteActorService.scala +++ b/akka-remote/src/main/scala/akka/remote/BootableRemoteActorService.scala @@ -23,7 +23,7 @@ trait BootableRemoteActorService extends Bootable with Logging { def startRemoteService = remoteServerThread.start abstract override def onLoad = { - if (RemoteServer.isRemotingEnabled) { + if (ReflectiveAccess.isRemotingEnabled && RemoteServerSettings.isRemotingEnabled) { log.slf4j.info("Initializing Remote Actors Service...") startRemoteService log.slf4j.info("Remote Actors Service initialized") diff --git a/akka-remote/src/main/scala/akka/remote/RemoteShared.scala b/akka-remote/src/main/scala/akka/remote/RemoteShared.scala new file mode 100644 index 0000000000..57a28f5c21 --- /dev/null +++ b/akka-remote/src/main/scala/akka/remote/RemoteShared.scala @@ -0,0 +1,66 @@ +/** + * Copyright (C) 2009-2011 Scalable Solutions AB + */ + +package akka.remote + +import akka.util.Duration +import akka.config.Config._ +import akka.config.ConfigurationException + +object RemoteClientSettings { + val SECURE_COOKIE: Option[String] = config.getString("akka.remote.secure-cookie", "") match { + case "" => None + case cookie => Some(cookie) + } + val RECONNECTION_TIME_WINDOW = Duration(config.getInt("akka.remote.client.reconnection-time-window", 600), TIME_UNIT).toMillis + val READ_TIMEOUT = Duration(config.getInt("akka.remote.client.read-timeout", 1), TIME_UNIT) + val RECONNECT_DELAY = Duration(config.getInt("akka.remote.client.reconnect-delay", 5), TIME_UNIT) + val MESSAGE_FRAME_SIZE = config.getInt("akka.remote.client.message-frame-size", 1048576) +} + +object RemoteServerSettings { + val isRemotingEnabled = config.getList("akka.enabled-modules").exists(_ == "remote") + val MESSAGE_FRAME_SIZE = config.getInt("akka.remote.server.message-frame-size", 1048576) + val SECURE_COOKIE = config.getString("akka.remote.secure-cookie") + val REQUIRE_COOKIE = { + val requireCookie = config.getBool("akka.remote.server.require-cookie", true) + if (isRemotingEnabled && requireCookie && SECURE_COOKIE.isEmpty) throw new ConfigurationException( + "Configuration option 'akka.remote.server.require-cookie' is turned on but no secure cookie is defined in 'akka.remote.secure-cookie'.") + requireCookie + } + + val UNTRUSTED_MODE = config.getBool("akka.remote.server.untrusted-mode", false) + val HOSTNAME = config.getString("akka.remote.server.hostname", "localhost") + val PORT = config.getInt("akka.remote.server.port", 2552) + val CONNECTION_TIMEOUT_MILLIS = Duration(config.getInt("akka.remote.server.connection-timeout", 1), TIME_UNIT) + val COMPRESSION_SCHEME = config.getString("akka.remote.compression-scheme", "zlib") + val ZLIB_COMPRESSION_LEVEL = { + val level = config.getInt("akka.remote.zlib-compression-level", 6) + if (level < 1 && level > 9) throw new IllegalArgumentException( + "zlib compression level has to be within 1-9, with 1 being fastest and 9 being the most compressed") + level + } + + val SECURE = { + /*if (config.getBool("akka.remote.ssl.service",false)) { + val properties = List( + ("key-store-type" , "keyStoreType"), + ("key-store" , "keyStore"), + ("key-store-pass" , "keyStorePassword"), + ("trust-store-type", "trustStoreType"), + ("trust-store" , "trustStore"), + ("trust-store-pass", "trustStorePassword") + ).map(x => ("akka.remote.ssl." + x._1, "javax.net.ssl." + x._2)) + + // If property is not set, and we have a value from our akka.conf, use that value + for { + p <- properties if System.getProperty(p._2) eq null + c <- config.getString(p._1) + } System.setProperty(p._2, c) + + if (config.getBool("akka.remote.ssl.debug", false)) System.setProperty("javax.net.debug","ssl") + true + } else */false + } +} diff --git a/akka-remote/src/main/scala/akka/remote/NettyRemoteSupport.scala b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala similarity index 88% rename from akka-remote/src/main/scala/akka/remote/NettyRemoteSupport.scala rename to akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala index dd670b5bd4..00eb9910fa 100644 --- a/akka-remote/src/main/scala/akka/remote/NettyRemoteSupport.scala +++ b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala @@ -2,20 +2,23 @@ * Copyright (C) 2009-2011 Scalable Solutions AB */ -package akka.remote +package akka.remote.netty -import akka.remote.protocol.RemoteProtocol.{ActorType => ActorTypeProtocol, _} import akka.dispatch.{DefaultCompletableFuture, CompletableFuture, Future} import akka.remote.protocol.RemoteProtocol._ import akka.remote.protocol.RemoteProtocol.ActorType._ import akka.config.ConfigurationException import akka.serialization.RemoteActorSerialization +import akka.serialization.RemoteActorSerialization._ import akka.japi.Creator import akka.config.Config._ -import akka.serialization.RemoteActorSerialization._ +import akka.remoteinterface._ +import akka.actor. {Index, ActorInitializationException, LocalActorRef, newUuid, ActorRegistry, Actor, RemoteActorRef, TypedActor, ActorRef, IllegalActorStateException, RemoteActorSystemMessage, uuidFrom, Uuid, Exit, LifeCycleMessage, ActorType => AkkaActorType} import akka.AkkaException import akka.actor.Actor._ import akka.util._ +import akka.remote.MessageSerializer +import akka.remote.{RemoteClientSettings, RemoteServerSettings} import org.jboss.netty.channel._ import org.jboss.netty.channel.group.{DefaultChannelGroup,ChannelGroup} @@ -31,17 +34,11 @@ import org.jboss.netty.handler.ssl.SslHandler import java.net.{ SocketAddress, InetSocketAddress } import java.util.concurrent.{ TimeUnit, Executors, ConcurrentMap, ConcurrentHashMap, ConcurrentSkipListSet } -import scala.collection.mutable.{ HashSet, HashMap } +import scala.collection.mutable.{ HashMap } import scala.reflect.BeanProperty import java.lang.reflect.InvocationTargetException import java.util.concurrent.atomic. {AtomicReference, AtomicLong, AtomicBoolean} -import akka.remoteinterface._ -import akka.actor. {Index, ActorInitializationException, LocalActorRef, newUuid, ActorRegistry, Actor, RemoteActorRef, TypedActor, ActorRef, IllegalActorStateException, RemoteActorSystemMessage, uuidFrom, Uuid, Exit, LifeCycleMessage, ActorType => AkkaActorType} -/** - * The RemoteClient object manages RemoteClient instances and gives you an API to lookup remote actor handles. - * - * @author Jonas Bonér - */ + trait NettyRemoteClientModule extends RemoteClientModule { self: ListenerManagement with Logging => private val remoteClients = new HashMap[Address, RemoteClient] private val remoteActors = new Index[Address, Uuid] @@ -60,16 +57,16 @@ trait NettyRemoteClientModule extends RemoteClientModule { self: ListenerManagem typedActorInfo: Option[Tuple2[String, String]], actorType: AkkaActorType, loader: Option[ClassLoader]): Option[CompletableFuture[T]] = - clientFor(remoteAddress, loader).send[T](message, senderOption, senderFuture, remoteAddress, timeout, isOneWay, actorRef, typedActorInfo, actorType) + withClientFor(remoteAddress, loader)(_.send[T](message, senderOption, senderFuture, remoteAddress, timeout, isOneWay, actorRef, typedActorInfo, actorType)) - private[akka] def clientFor( - address: InetSocketAddress, loader: Option[ClassLoader]): RemoteClient = { + private[akka] def withClientFor[T]( + address: InetSocketAddress, loader: Option[ClassLoader])(fun: RemoteClient => T): T = { loader.foreach(MessageSerializer.setClassLoader(_))//TODO: REVISIT: THIS SMELLS FUNNY - val key = makeKey(address) + val key = Address(address) lock.readLock.lock try { - remoteClients.get(key) match { + val c = remoteClients.get(key) match { case Some(client) => client case None => lock.readLock.unlock @@ -87,33 +84,29 @@ trait NettyRemoteClientModule extends RemoteClientModule { self: ListenerManagem } finally { lock.readLock.lock } //downgrade } finally { lock.writeLock.unlock } } + fun(c) } finally { lock.readLock.unlock } } - private def makeKey(a: InetSocketAddress): Address = a match { - case null => null - case address => Address(address.getHostName,address.getPort) - } - def shutdownClientConnection(address: InetSocketAddress): Boolean = lock withWriteGuard { - remoteClients.remove(makeKey(address)) match { + remoteClients.remove(Address(address)) match { case Some(client) => client.shutdown case None => false } } def restartClientConnection(address: InetSocketAddress): Boolean = lock withReadGuard { - remoteClients.get(makeKey(address)) match { + remoteClients.get(Address(address)) match { case Some(client) => client.connect(reconnectIfAlreadyConnected = true) case None => false } } private[akka] def registerSupervisorForActor(actorRef: ActorRef): ActorRef = - clientFor(actorRef.homeAddress.get, None).registerSupervisorForActor(actorRef) + withClientFor(actorRef.homeAddress.get, None)(_.registerSupervisorForActor(actorRef)) private[akka] def deregisterSupervisorForActor(actorRef: ActorRef): ActorRef = lock withReadGuard { - remoteClients.get(makeKey(actorRef.homeAddress.get)) match { + remoteClients.get(Address(actorRef.homeAddress.get)) match { case Some(client) => client.deregisterSupervisorForActor(actorRef) case None => actorRef } @@ -143,17 +136,11 @@ trait NettyRemoteClientModule extends RemoteClientModule { self: ListenerManagem } } -object RemoteClient { - val SECURE_COOKIE: Option[String] = config.getString("akka.remote.secure-cookie", "") match { - case "" => None - case cookie => Some(cookie) - } - val RECONNECTION_TIME_WINDOW = Duration(config.getInt("akka.remote.client.reconnection-time-window", 600), TIME_UNIT).toMillis - val READ_TIMEOUT = Duration(config.getInt("akka.remote.client.read-timeout", 1), TIME_UNIT) - val RECONNECT_DELAY = Duration(config.getInt("akka.remote.client.reconnect-delay", 5), TIME_UNIT) - val MESSAGE_FRAME_SIZE = config.getInt("akka.remote.client.message-frame-size", 1048576) -} - +/** + * This is the abstract baseclass for netty remote clients, + * currently there's only an ActiveRemoteClient, but otehrs could be feasible, like a PassiveRemoteClient that + * reuses an already established connection. + */ abstract class RemoteClient private[akka] ( val module: NettyRemoteClientModule, val remoteAddress: InetSocketAddress) extends Logging { @@ -165,14 +152,27 @@ abstract class RemoteClient private[akka] ( private[remote] val runSwitch = new Switch() private[remote] val isAuthenticated = new AtomicBoolean(false) + /** + * Is this client currently running? + */ private[remote] def isRunning = runSwitch.isOn protected def notifyListeners(msg: => Any); Unit protected def currentChannel: Channel + /** + * Pretty self explanatory? + */ def connect(reconnectIfAlreadyConnected: Boolean = false): Boolean + + /** + * Shuts this client down and releases any resources attached + */ def shutdown: Boolean + /** + * Converts the message to the wireprotocol and sends the message across the wire + */ def send[T]( message: Any, senderOption: Option[ActorRef], @@ -194,10 +194,13 @@ abstract class RemoteClient private[akka] ( senderOption, typedActorInfo, actorType, - if (isAuthenticated.compareAndSet(false, true)) RemoteClient.SECURE_COOKIE else None + if (isAuthenticated.compareAndSet(false, true)) RemoteClientSettings.SECURE_COOKIE else None ).build, senderFuture) } + /** + * Sends the message across the wire + */ def send[T]( request: RemoteMessageProtocol, senderFuture: Option[CompletableFuture[T]]): Option[CompletableFuture[T]] = { @@ -251,7 +254,7 @@ abstract class RemoteClient private[akka] ( } /** - * RemoteClient represents a connection to a RemoteServer. Is used to send messages to remote actors on the RemoteServer. + * RemoteClient represents a connection to an Akka node. Is used to send messages to remote actors on the node. * * @author Jonas Bonér */ @@ -260,7 +263,7 @@ class ActiveRemoteClient private[akka] ( remoteAddress: InetSocketAddress, val loader: Option[ClassLoader] = None, notifyListenersFun: (=> Any) => Unit) extends RemoteClient(module, remoteAddress) { - import RemoteClient._ + import RemoteClientSettings._ //FIXME rewrite to a wrapper object (minimize volatile access and maximize encapsulation) @volatile private var bootstrap: ClientBootstrap = _ @volatile private[remote] var connection: ChannelFuture = _ @@ -366,14 +369,14 @@ class ActiveRemoteClientPipelineFactory( e } - val ssl = if (RemoteServer.SECURE) join(new SslHandler(engine)) else join() - val timeout = new ReadTimeoutHandler(timer, RemoteClient.READ_TIMEOUT.toMillis.toInt) - val lenDec = new LengthFieldBasedFrameDecoder(RemoteClient.MESSAGE_FRAME_SIZE, 0, 4, 0, 4) + val ssl = if (RemoteServerSettings.SECURE) join(new SslHandler(engine)) else join() + val timeout = new ReadTimeoutHandler(timer, RemoteClientSettings.READ_TIMEOUT.toMillis.toInt) + val lenDec = new LengthFieldBasedFrameDecoder(RemoteClientSettings.MESSAGE_FRAME_SIZE, 0, 4, 0, 4) val lenPrep = new LengthFieldPrepender(4) val protobufDec = new ProtobufDecoder(RemoteMessageProtocol.getDefaultInstance) val protobufEnc = new ProtobufEncoder - val (enc, dec) = RemoteServer.COMPRESSION_SCHEME match { - case "zlib" => (join(new ZlibEncoder(RemoteServer.ZLIB_COMPRESSION_LEVEL)), join(new ZlibDecoder)) + val (enc, dec) = RemoteServerSettings.COMPRESSION_SCHEME match { + case "zlib" => (join(new ZlibEncoder(RemoteServerSettings.ZLIB_COMPRESSION_LEVEL)), join(new ZlibDecoder)) case _ => (join(), join()) } @@ -452,7 +455,7 @@ class ActiveRemoteClientHandler( client.openChannels.remove(event.getChannel) client.connect(reconnectIfAlreadyConnected = true) } - }, RemoteClient.RECONNECT_DELAY.toMillis, TimeUnit.MILLISECONDS) + }, RemoteClientSettings.RECONNECT_DELAY.toMillis, TimeUnit.MILLISECONDS) } else spawn { client.shutdown } } @@ -463,7 +466,7 @@ class ActiveRemoteClientHandler( client.resetReconnectionTimeWindow } - if (RemoteServer.SECURE) { + if (RemoteServerSettings.SECURE) { val sslHandler: SslHandler = ctx.getPipeline.get(classOf[SslHandler]) sslHandler.handshake.addListener(new ChannelFutureListener { def operationComplete(future: ChannelFuture): Unit = { @@ -507,58 +510,6 @@ class ActiveRemoteClientHandler( } } -/** - * For internal use only. Holds configuration variables, remote actors, remote typed actors and remote servers. - * - * @author Jonas Bonér - */ -object RemoteServer { - val isRemotingEnabled = config.getList("akka.enabled-modules").exists(_ == "remote") - val MESSAGE_FRAME_SIZE = config.getInt("akka.remote.server.message-frame-size", 1048576) - val SECURE_COOKIE = config.getString("akka.remote.secure-cookie") - val REQUIRE_COOKIE = { - val requireCookie = config.getBool("akka.remote.server.require-cookie", true) - if (isRemotingEnabled && requireCookie && RemoteServer.SECURE_COOKIE.isEmpty) throw new ConfigurationException( - "Configuration option 'akka.remote.server.require-cookie' is turned on but no secure cookie is defined in 'akka.remote.secure-cookie'.") - requireCookie - } - - val UNTRUSTED_MODE = config.getBool("akka.remote.server.untrusted-mode", false) - val HOSTNAME = config.getString("akka.remote.server.hostname", "localhost") - val PORT = config.getInt("akka.remote.server.port", 2552) - val CONNECTION_TIMEOUT_MILLIS = Duration(config.getInt("akka.remote.server.connection-timeout", 1), TIME_UNIT) - val COMPRESSION_SCHEME = config.getString("akka.remote.compression-scheme", "zlib") - val ZLIB_COMPRESSION_LEVEL = { - val level = config.getInt("akka.remote.zlib-compression-level", 6) - if (level < 1 && level > 9) throw new IllegalArgumentException( - "zlib compression level has to be within 1-9, with 1 being fastest and 9 being the most compressed") - level - } - - val SECURE = { - /*if (config.getBool("akka.remote.ssl.service",false)) { - val properties = List( - ("key-store-type" , "keyStoreType"), - ("key-store" , "keyStore"), - ("key-store-pass" , "keyStorePassword"), - ("trust-store-type", "trustStoreType"), - ("trust-store" , "trustStore"), - ("trust-store-pass", "trustStorePassword") - ).map(x => ("akka.remote.ssl." + x._1, "javax.net.ssl." + x._2)) - - // If property is not set, and we have a value from our akka.conf, use that value - for { - p <- properties if System.getProperty(p._2) eq null - c <- config.getString(p._1) - } System.setProperty(p._2, c) - - if (config.getBool("akka.remote.ssl.debug", false)) System.setProperty("javax.net.debug","ssl") - true - } else */false - } -} - - /** * Provides the implementation of the Netty remote support */ @@ -612,7 +563,7 @@ class NettyRemoteServer(serverModule: NettyRemoteServerModule, val host: String, bootstrap.setOption("child.tcpNoDelay", true) bootstrap.setOption("child.keepAlive", true) bootstrap.setOption("child.reuseAddress", true) - bootstrap.setOption("child.connectTimeoutMillis", RemoteServer.CONNECTION_TIMEOUT_MILLIS.toMillis) + bootstrap.setOption("child.connectTimeoutMillis", RemoteServerSettings.CONNECTION_TIMEOUT_MILLIS.toMillis) openChannels.add(bootstrap.bind(address)) serverModule.notifyListeners(RemoteServerStarted(serverModule)) @@ -631,7 +582,7 @@ class NettyRemoteServer(serverModule: NettyRemoteServerModule, val host: String, } trait NettyRemoteServerModule extends RemoteServerModule { self: RemoteModule => - import RemoteServer._ + import RemoteServerSettings._ private[akka] val currentServer = new AtomicReference[Option[NettyRemoteServer]](None) def address = currentServer.get match { @@ -828,7 +779,7 @@ class RemoteServerPipelineFactory( val openChannels: ChannelGroup, val loader: Option[ClassLoader], val server: NettyRemoteServerModule) extends ChannelPipelineFactory { - import RemoteServer._ + import RemoteServerSettings._ def getPipeline: ChannelPipeline = { def join(ch: ChannelHandler*) = Array[ChannelHandler](ch:_*) @@ -840,13 +791,13 @@ class RemoteServerPipelineFactory( e } - val ssl = if(RemoteServer.SECURE) join(new SslHandler(engine)) else join() - val lenDec = new LengthFieldBasedFrameDecoder(RemoteServer.MESSAGE_FRAME_SIZE, 0, 4, 0, 4) + val ssl = if(SECURE) join(new SslHandler(engine)) else join() + val lenDec = new LengthFieldBasedFrameDecoder(MESSAGE_FRAME_SIZE, 0, 4, 0, 4) val lenPrep = new LengthFieldPrepender(4) val protobufDec = new ProtobufDecoder(RemoteMessageProtocol.getDefaultInstance) val protobufEnc = new ProtobufEncoder - val (enc, dec) = RemoteServer.COMPRESSION_SCHEME match { - case "zlib" => (join(new ZlibEncoder(RemoteServer.ZLIB_COMPRESSION_LEVEL)), join(new ZlibDecoder)) + val (enc, dec) = COMPRESSION_SCHEME match { + case "zlib" => (join(new ZlibEncoder(ZLIB_COMPRESSION_LEVEL)), join(new ZlibDecoder)) case _ => (join(), join()) } @@ -865,7 +816,7 @@ class RemoteServerHandler( val openChannels: ChannelGroup, val applicationLoader: Option[ClassLoader], val server: NettyRemoteServerModule) extends SimpleChannelUpstreamHandler with Logging { - import RemoteServer._ + import RemoteServerSettings._ val AW_PROXY_PREFIX = "$$ProxiedByAW".intern val CHANNEL_INIT = "channel-init".intern @@ -876,7 +827,7 @@ class RemoteServerHandler( applicationLoader.foreach(MessageSerializer.setClassLoader(_)) //TODO: REVISIT: THIS FEELS A BIT DODGY /** - * ChannelOpen overridden to store open channels for a clean postStop of a RemoteServer. + * ChannelOpen overridden to store open channels for a clean postStop of a node. * If a channel is closed before, it is automatically removed from the open channels group. */ override def channelOpen(ctx: ChannelHandlerContext, event: ChannelStateEvent) = openChannels.add(ctx.getChannel) @@ -886,7 +837,7 @@ class RemoteServerHandler( sessionActors.set(event.getChannel(), new ConcurrentHashMap[String, ActorRef]()) typedSessionActors.set(event.getChannel(), new ConcurrentHashMap[String, AnyRef]()) log.slf4j.debug("Remote client [{}] connected to [{}]", clientAddress, server.name) - if (RemoteServer.SECURE) { + if (SECURE) { val sslHandler: SslHandler = ctx.getPipeline.get(classOf[SslHandler]) // Begin handshake. sslHandler.handshake().addListener(new ChannelFutureListener { @@ -900,7 +851,7 @@ class RemoteServerHandler( } else { server.notifyListeners(RemoteServerClientConnected(server, clientAddress)) } - if (RemoteServer.REQUIRE_COOKIE) ctx.setAttachment(CHANNEL_INIT) // signal that this is channel initialization, which will need authentication + if (REQUIRE_COOKIE) ctx.setAttachment(CHANNEL_INIT) // signal that this is channel initialization, which will need authentication } override def channelDisconnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = { @@ -942,7 +893,7 @@ class RemoteServerHandler( override def messageReceived(ctx: ChannelHandlerContext, event: MessageEvent) = event.getMessage match { case null => throw new IllegalActorStateException("Message in remote MessageEvent is null: " + event) case requestProtocol: RemoteMessageProtocol => - if (RemoteServer.REQUIRE_COOKIE) authenticateRemoteClient(requestProtocol, ctx) + if (REQUIRE_COOKIE) authenticateRemoteClient(requestProtocol, ctx) handleRemoteMessageProtocol(requestProtocol, event.getChannel) case _ => //ignore } @@ -990,9 +941,9 @@ class RemoteServerHandler( message match { // first match on system messages case RemoteActorSystemMessage.Stop => - if (RemoteServer.UNTRUSTED_MODE) throw new SecurityException("Remote server is operating is untrusted mode, can not stop the actor") + if (UNTRUSTED_MODE) throw new SecurityException("Remote server is operating is untrusted mode, can not stop the actor") else actorRef.stop - case _: LifeCycleMessage if (RemoteServer.UNTRUSTED_MODE) => + case _: LifeCycleMessage if (UNTRUSTED_MODE) => throw new SecurityException("Remote server is operating is untrusted mode, can not pass on a LifeCycleMessage to the remote actor") case _ => // then match on user defined messages @@ -1143,7 +1094,7 @@ class RemoteServerHandler( val name = actorInfo.getTarget try { - if (RemoteServer.UNTRUSTED_MODE) throw new SecurityException( + if (UNTRUSTED_MODE) throw new SecurityException( "Remote server is operating is untrusted mode, can not create remote actors on behalf of the remote client") log.slf4j.info("Creating a new client-managed remote actor [{}:{}]", name, uuid) @@ -1216,7 +1167,7 @@ class RemoteServerHandler( val uuid = actorInfo.getUuid try { - if (RemoteServer.UNTRUSTED_MODE) throw new SecurityException( + if (UNTRUSTED_MODE) throw new SecurityException( "Remote server is operating is untrusted mode, can not create remote actors on behalf of the remote client") log.slf4j.info("Creating a new remote typed actor:\n\t[{} :: {}]", interfaceClassname, targetClassname) @@ -1285,7 +1236,7 @@ class RemoteServerHandler( val clientAddress = ctx.getChannel.getRemoteAddress.toString if (!request.hasCookie) throw new SecurityException( "The remote client [" + clientAddress + "] does not have a secure cookie.") - if (!(request.getCookie == RemoteServer.SECURE_COOKIE.get)) throw new SecurityException( + if (!(request.getCookie == SECURE_COOKIE.get)) throw new SecurityException( "The remote client [" + clientAddress + "] secure cookie is not the same as remote server secure cookie") log.slf4j.info("Remote client [{}] successfully authenticated using secure cookie", clientAddress) } diff --git a/akka-remote/src/main/scala/akka/serialization/SerializationProtocol.scala b/akka-remote/src/main/scala/akka/serialization/SerializationProtocol.scala index f3b110958a..5f351ccbb1 100644 --- a/akka-remote/src/main/scala/akka/serialization/SerializationProtocol.scala +++ b/akka-remote/src/main/scala/akka/serialization/SerializationProtocol.scala @@ -5,7 +5,6 @@ package akka.serialization import akka.dispatch.MessageInvocation -import akka.remote.{RemoteServer, RemoteClient, MessageSerializer} import akka.remote.protocol.RemoteProtocol.{ActorType => ActorTypeProtocol, _} import ActorTypeProtocol._ @@ -18,6 +17,7 @@ import scala.collection.immutable.Stack import com.google.protobuf.ByteString import akka.util.ReflectiveAccess import java.net.InetSocketAddress +import akka.remote. {RemoteClientSettings, MessageSerializer} /** * Type class definition for Actor Serialization @@ -140,7 +140,7 @@ object ActorSerialization { actorRef.getSender, None, ActorType.ScalaActor, - RemoteClient.SECURE_COOKIE).build) + RemoteClientSettings.SECURE_COOKIE).build) requestProtocols.foreach(rp => builder.addMessages(rp)) } diff --git a/akka-remote/src/test/scala/remote/AkkaRemoteTest.scala b/akka-remote/src/test/scala/remote/AkkaRemoteTest.scala index 00e09a4865..63cc942381 100644 --- a/akka-remote/src/test/scala/remote/AkkaRemoteTest.scala +++ b/akka-remote/src/test/scala/remote/AkkaRemoteTest.scala @@ -5,7 +5,7 @@ import org.scalatest.matchers.MustMatchers import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach} import org.scalatest.junit.JUnitRunner import org.junit.runner.RunWith -import akka.remote.NettyRemoteSupport +import akka.remote.netty.NettyRemoteSupport import akka.actor. {Actor, ActorRegistry} import java.util.concurrent. {TimeUnit, CountDownLatch} diff --git a/akka-remote/src/test/scala/remote/ClientInitiatedRemoteActorSpec.scala b/akka-remote/src/test/scala/remote/ClientInitiatedRemoteActorSpec.scala index 3ffff3c3cd..36fd4ae586 100644 --- a/akka-remote/src/test/scala/remote/ClientInitiatedRemoteActorSpec.scala +++ b/akka-remote/src/test/scala/remote/ClientInitiatedRemoteActorSpec.scala @@ -8,7 +8,6 @@ import org.scalatest.junit.JUnitRunner import org.junit.runner.RunWith import akka.dispatch.Dispatchers -import akka.remote. {NettyRemoteSupport, RemoteServer, RemoteClient} import akka.actor.Actor._ import akka.actor._ diff --git a/akka-remote/src/test/scala/remote/RemoteSupervisorSpec.scala b/akka-remote/src/test/scala/remote/RemoteSupervisorSpec.scala index 02b65d4a30..f97ea75841 100644 --- a/akka-remote/src/test/scala/remote/RemoteSupervisorSpec.scala +++ b/akka-remote/src/test/scala/remote/RemoteSupervisorSpec.scala @@ -7,7 +7,6 @@ package akka.actor.remote import java.util.concurrent.{LinkedBlockingQueue, TimeUnit, BlockingQueue} import akka.serialization.BinaryString import akka.config.Supervision._ -import akka.remote.{RemoteServer, RemoteClient} import akka.OneWay import org.scalatest._ import org.scalatest.WordSpec diff --git a/akka-remote/src/test/scala/remote/ServerInitiatedRemoteActorSpec.scala b/akka-remote/src/test/scala/remote/ServerInitiatedRemoteActorSpec.scala index b119a1e1fc..264831b208 100644 --- a/akka-remote/src/test/scala/remote/ServerInitiatedRemoteActorSpec.scala +++ b/akka-remote/src/test/scala/remote/ServerInitiatedRemoteActorSpec.scala @@ -4,7 +4,6 @@ import java.util.concurrent.{CountDownLatch, TimeUnit} import akka.actor.Actor._ import akka.actor.{ActorRegistry, ActorRef, Actor} -import akka.remote. {NettyRemoteSupport} object ServerInitiatedRemoteActorSpec { case class Send(actor: ActorRef) diff --git a/akka-remote/src/test/scala/remote/ServerInitiatedRemoteSessionActorSpec.scala b/akka-remote/src/test/scala/remote/ServerInitiatedRemoteSessionActorSpec.scala index 8a97ec3516..99c6f77ea3 100644 --- a/akka-remote/src/test/scala/remote/ServerInitiatedRemoteSessionActorSpec.scala +++ b/akka-remote/src/test/scala/remote/ServerInitiatedRemoteSessionActorSpec.scala @@ -6,8 +6,8 @@ package akka.actor.remote import akka.actor._ import akka.actor.Actor._ -import akka.remote.NettyRemoteSupport import java.util.concurrent. {ConcurrentSkipListSet, TimeUnit} +import akka.remote.netty.NettyRemoteSupport object ServerInitiatedRemoteSessionActorSpec { diff --git a/akka-remote/src/test/scala/remote/ServerInitiatedRemoteTypedActorSpec.scala b/akka-remote/src/test/scala/remote/ServerInitiatedRemoteTypedActorSpec.scala index e26a873b0c..6c7543dbe3 100644 --- a/akka-remote/src/test/scala/remote/ServerInitiatedRemoteTypedActorSpec.scala +++ b/akka-remote/src/test/scala/remote/ServerInitiatedRemoteTypedActorSpec.scala @@ -6,7 +6,6 @@ package akka.actor.remote import java.util.concurrent.TimeUnit -import akka.remote.{RemoteServer, RemoteClient} import akka.actor._ import RemoteTypedActorLog._ diff --git a/akka-remote/src/test/scala/serialization/ProtobufActorMessageSerializationSpec.scala b/akka-remote/src/test/scala/serialization/ProtobufActorMessageSerializationSpec.scala index d0dcdac048..2ae30c3b64 100644 --- a/akka-remote/src/test/scala/serialization/ProtobufActorMessageSerializationSpec.scala +++ b/akka-remote/src/test/scala/serialization/ProtobufActorMessageSerializationSpec.scala @@ -1,13 +1,8 @@ package akka.actor.serialization -import java.util.concurrent.TimeUnit -import org.scalatest._ -import org.scalatest.WordSpec -import org.scalatest.matchers.MustMatchers import akka.actor.{ProtobufProtocol, Actor} import ProtobufProtocol.ProtobufPOJO import Actor._ -import akka.remote.NettyRemoteSupport import akka.actor.remote.AkkaRemoteTest /* ---------------------------