Completed Erlang-style cookie handshake between RemoteClient and RemoteServer
This commit is contained in:
commit
52f5e5e861
9 changed files with 187 additions and 211 deletions
|
|
@ -139,35 +139,27 @@ class Switch(startAsOn: Boolean = false) {
|
||||||
def switchOn: Boolean = synchronized { switch.compareAndSet(false, true) }
|
def switchOn: Boolean = synchronized { switch.compareAndSet(false, true) }
|
||||||
|
|
||||||
def ifOnYield[T](action: => T): Option[T] = {
|
def ifOnYield[T](action: => T): Option[T] = {
|
||||||
if (switch.get)
|
if (switch.get) Some(action)
|
||||||
Some(action)
|
else None
|
||||||
else
|
|
||||||
None
|
|
||||||
}
|
}
|
||||||
|
|
||||||
def ifOffYield[T](action: => T): Option[T] = {
|
def ifOffYield[T](action: => T): Option[T] = {
|
||||||
if (switch.get)
|
if (switch.get) Some(action)
|
||||||
Some(action)
|
else None
|
||||||
else
|
|
||||||
None
|
|
||||||
}
|
}
|
||||||
|
|
||||||
def ifOn(action: => Unit): Boolean = {
|
def ifOn(action: => Unit): Boolean = {
|
||||||
if (switch.get) {
|
if (switch.get) {
|
||||||
action
|
action
|
||||||
true
|
true
|
||||||
}
|
} else false
|
||||||
else
|
|
||||||
false
|
|
||||||
}
|
}
|
||||||
|
|
||||||
def ifOff(action: => Unit): Boolean = {
|
def ifOff(action: => Unit): Boolean = {
|
||||||
if (!switch.get) {
|
if (!switch.get) {
|
||||||
action
|
action
|
||||||
true
|
true
|
||||||
}
|
} else false
|
||||||
else
|
|
||||||
false
|
|
||||||
}
|
}
|
||||||
|
|
||||||
def isOn = switch.get
|
def isOn = switch.get
|
||||||
|
|
|
||||||
|
|
@ -91,6 +91,13 @@ message TypedActorInfoProtocol {
|
||||||
required string method = 2;
|
required string method = 2;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Defines a remote connection handshake.
|
||||||
|
*/
|
||||||
|
//message HandshakeProtocol {
|
||||||
|
// required string cookie = 1;
|
||||||
|
//}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Defines a remote message request.
|
* Defines a remote message request.
|
||||||
*/
|
*/
|
||||||
|
|
|
||||||
|
|
@ -5,13 +5,15 @@
|
||||||
package se.scalablesolutions.akka.remote
|
package se.scalablesolutions.akka.remote
|
||||||
|
|
||||||
import se.scalablesolutions.akka.remote.protocol.RemoteProtocol.{ ActorType => ActorTypeProtocol, _ }
|
import se.scalablesolutions.akka.remote.protocol.RemoteProtocol.{ ActorType => ActorTypeProtocol, _ }
|
||||||
import se.scalablesolutions.akka.actor.{Exit, Actor, ActorRef, ActorType, RemoteActorRef, IllegalActorStateException}
|
import se.scalablesolutions.akka.actor._
|
||||||
|
//import se.scalablesolutions.akka.actor.Uuid.{newUuid, uuidFrom}
|
||||||
import se.scalablesolutions.akka.dispatch.{ DefaultCompletableFuture, CompletableFuture }
|
import se.scalablesolutions.akka.dispatch.{ DefaultCompletableFuture, CompletableFuture }
|
||||||
import se.scalablesolutions.akka.actor.{Uuid,newUuid,uuidFrom}
|
import se.scalablesolutions.akka.util._
|
||||||
import se.scalablesolutions.akka.config.Config._
|
import se.scalablesolutions.akka.config.Config._
|
||||||
import se.scalablesolutions.akka.serialization.RemoteActorSerialization._
|
import se.scalablesolutions.akka.serialization.RemoteActorSerialization._
|
||||||
import se.scalablesolutions.akka.AkkaException
|
import se.scalablesolutions.akka.AkkaException
|
||||||
import Actor._
|
import Actor._
|
||||||
|
|
||||||
import org.jboss.netty.channel._
|
import org.jboss.netty.channel._
|
||||||
import group.DefaultChannelGroup
|
import group.DefaultChannelGroup
|
||||||
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory
|
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory
|
||||||
|
|
@ -25,12 +27,10 @@ import org.jboss.netty.handler.ssl.SslHandler
|
||||||
|
|
||||||
import java.net.{ SocketAddress, InetSocketAddress }
|
import java.net.{ SocketAddress, InetSocketAddress }
|
||||||
import java.util.concurrent.{ TimeUnit, Executors, ConcurrentMap, ConcurrentHashMap, ConcurrentSkipListSet }
|
import java.util.concurrent.{ TimeUnit, Executors, ConcurrentMap, ConcurrentHashMap, ConcurrentSkipListSet }
|
||||||
import java.util.concurrent.atomic.AtomicLong
|
import java.util.concurrent.atomic.{ AtomicLong, AtomicBoolean }
|
||||||
|
|
||||||
import scala.collection.mutable.{ HashSet, HashMap }
|
import scala.collection.mutable.{ HashSet, HashMap }
|
||||||
import scala.reflect.BeanProperty
|
import scala.reflect.BeanProperty
|
||||||
import se.scalablesolutions.akka.actor._
|
|
||||||
import se.scalablesolutions.akka.util._
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Life-cycle events for RemoteClient.
|
* Life-cycle events for RemoteClient.
|
||||||
|
|
@ -59,6 +59,7 @@ class RemoteClientException private[akka](message: String, @BeanProperty val cli
|
||||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||||
*/
|
*/
|
||||||
object RemoteClient extends Logging {
|
object RemoteClient extends Logging {
|
||||||
|
|
||||||
val SECURE_COOKIE: Option[String] = {
|
val SECURE_COOKIE: Option[String] = {
|
||||||
val cookie = config.getString("akka.remote.secure-cookie", "")
|
val cookie = config.getString("akka.remote.secure-cookie", "")
|
||||||
if (cookie == "") None
|
if (cookie == "") None
|
||||||
|
|
@ -206,34 +207,40 @@ class RemoteClient private[akka] (
|
||||||
private val remoteAddress = new InetSocketAddress(hostname, port)
|
private val remoteAddress = new InetSocketAddress(hostname, port)
|
||||||
|
|
||||||
//FIXME rewrite to a wrapper object (minimize volatile access and maximize encapsulation)
|
//FIXME rewrite to a wrapper object (minimize volatile access and maximize encapsulation)
|
||||||
@volatile private var bootstrap: ClientBootstrap = _
|
@volatile
|
||||||
@volatile private[remote] var connection: ChannelFuture = _
|
private var bootstrap: ClientBootstrap = _
|
||||||
@volatile private[remote] var openChannels: DefaultChannelGroup = _
|
@volatile
|
||||||
@volatile private var timer: HashedWheelTimer = _
|
private[remote] var connection: ChannelFuture = _
|
||||||
|
@volatile
|
||||||
|
private[remote] var openChannels: DefaultChannelGroup = _
|
||||||
|
@volatile
|
||||||
|
private var timer: HashedWheelTimer = _
|
||||||
private[remote] val runSwitch = new Switch()
|
private[remote] val runSwitch = new Switch()
|
||||||
|
private[remote] val isAuthenticated = new AtomicBoolean(false)
|
||||||
|
|
||||||
private[remote] def isRunning = runSwitch.isOn
|
private[remote] def isRunning = runSwitch.isOn
|
||||||
|
|
||||||
private val reconnectionTimeWindow = Duration(config.getInt(
|
private val reconnectionTimeWindow = Duration(config.getInt(
|
||||||
"akka.remote.client.reconnection-time-window", 600), TIME_UNIT).toMillis
|
"akka.remote.client.reconnection-time-window", 600), TIME_UNIT).toMillis
|
||||||
@volatile private var reconnectionTimeWindowStart = 0L
|
@volatile
|
||||||
|
private var reconnectionTimeWindowStart = 0L
|
||||||
|
|
||||||
def connect = runSwitch switchOn {
|
def connect = runSwitch switchOn {
|
||||||
openChannels = new DefaultChannelGroup(classOf[RemoteClient].getName)
|
openChannels = new DefaultChannelGroup(classOf[RemoteClient].getName)
|
||||||
timer = new HashedWheelTimer
|
timer = new HashedWheelTimer
|
||||||
bootstrap = new ClientBootstrap(
|
|
||||||
new NioClientSocketChannelFactory(
|
bootstrap = new ClientBootstrap(new NioClientSocketChannelFactory(Executors.newCachedThreadPool, Executors.newCachedThreadPool))
|
||||||
Executors.newCachedThreadPool,Executors.newCachedThreadPool
|
|
||||||
)
|
|
||||||
)
|
|
||||||
bootstrap.setPipelineFactory(new RemoteClientPipelineFactory(name, futures, supervisors, bootstrap, remoteAddress, timer, this))
|
bootstrap.setPipelineFactory(new RemoteClientPipelineFactory(name, futures, supervisors, bootstrap, remoteAddress, timer, this))
|
||||||
bootstrap.setOption("tcpNoDelay", true)
|
bootstrap.setOption("tcpNoDelay", true)
|
||||||
bootstrap.setOption("keepAlive", true)
|
bootstrap.setOption("keepAlive", true)
|
||||||
connection = bootstrap.connect(remoteAddress)
|
|
||||||
log.info("Starting remote client connection to [%s:%s]", hostname, port)
|
log.info("Starting remote client connection to [%s:%s]", hostname, port)
|
||||||
|
|
||||||
// Wait until the connection attempt succeeds or fails.
|
// Wait until the connection attempt succeeds or fails.
|
||||||
|
connection = bootstrap.connect(remoteAddress)
|
||||||
val channel = connection.awaitUninterruptibly.getChannel
|
val channel = connection.awaitUninterruptibly.getChannel
|
||||||
openChannels.add(channel)
|
openChannels.add(channel)
|
||||||
|
|
||||||
if (!connection.isSuccess) {
|
if (!connection.isSuccess) {
|
||||||
notifyListeners(RemoteClientError(connection.getCause, this))
|
notifyListeners(RemoteClientError(connection.getCause, this))
|
||||||
log.error(connection.getCause, "Remote client connection to [%s:%s] has failed", hostname, port)
|
log.error(connection.getCause, "Remote client connection to [%s:%s] has failed", hostname, port)
|
||||||
|
|
@ -274,14 +281,16 @@ class RemoteClient private[akka] (
|
||||||
actorRef: ActorRef,
|
actorRef: ActorRef,
|
||||||
typedActorInfo: Option[Tuple2[String, String]],
|
typedActorInfo: Option[Tuple2[String, String]],
|
||||||
actorType: ActorType): Option[CompletableFuture[T]] = {
|
actorType: ActorType): Option[CompletableFuture[T]] = {
|
||||||
|
val cookie = if (isAuthenticated.compareAndSet(false, true)) RemoteClient.SECURE_COOKIE
|
||||||
|
else None
|
||||||
send(createRemoteRequestProtocolBuilder(
|
send(createRemoteRequestProtocolBuilder(
|
||||||
actorRef, message, isOneWay, senderOption, typedActorInfo, actorType, RemoteClient.SECURE_COOKIE).build, senderFuture)
|
actorRef, message, isOneWay, senderOption, typedActorInfo, actorType, cookie).build, senderFuture)
|
||||||
}
|
}
|
||||||
|
|
||||||
def send[T](
|
def send[T](
|
||||||
request: RemoteRequestProtocol,
|
request: RemoteRequestProtocol,
|
||||||
senderFuture: Option[CompletableFuture[T]]):
|
senderFuture: Option[CompletableFuture[T]]): Option[CompletableFuture[T]] = {
|
||||||
Option[CompletableFuture[T]] = if (isRunning) {
|
if (isRunning) {
|
||||||
if (request.getIsOneWay) {
|
if (request.getIsOneWay) {
|
||||||
connection.getChannel.write(request)
|
connection.getChannel.write(request)
|
||||||
None
|
None
|
||||||
|
|
@ -300,6 +309,7 @@ class RemoteClient private[akka] (
|
||||||
notifyListeners(RemoteClientError(exception, this))
|
notifyListeners(RemoteClientError(exception, this))
|
||||||
throw exception
|
throw exception
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private[akka] def registerSupervisorForActor(actorRef: ActorRef) =
|
private[akka] def registerSupervisorForActor(actorRef: ActorRef) =
|
||||||
if (!actorRef.supervisor.isDefined) throw new IllegalActorStateException(
|
if (!actorRef.supervisor.isDefined) throw new IllegalActorStateException(
|
||||||
|
|
@ -430,6 +440,7 @@ class RemoteClientHandler(
|
||||||
timer.newTimeout(new TimerTask() {
|
timer.newTimeout(new TimerTask() {
|
||||||
def run(timeout: Timeout) = {
|
def run(timeout: Timeout) = {
|
||||||
client.openChannels.remove(event.getChannel)
|
client.openChannels.remove(event.getChannel)
|
||||||
|
client.isAuthenticated.set(false)
|
||||||
log.debug("Remote client reconnecting to [%s]", remoteAddress)
|
log.debug("Remote client reconnecting to [%s]", remoteAddress)
|
||||||
client.connection = bootstrap.connect(remoteAddress)
|
client.connection = bootstrap.connect(remoteAddress)
|
||||||
client.connection.awaitUninterruptibly // Wait until the connection attempt succeeds or fails.
|
client.connection.awaitUninterruptibly // Wait until the connection attempt succeeds or fails.
|
||||||
|
|
|
||||||
|
|
@ -212,13 +212,14 @@ class RemoteServer extends Logging with ListenerManagement {
|
||||||
address = Address(_hostname,_port)
|
address = Address(_hostname,_port)
|
||||||
log.info("Starting remote server at [%s:%s]", hostname, port)
|
log.info("Starting remote server at [%s:%s]", hostname, port)
|
||||||
RemoteServer.register(hostname, port, this)
|
RemoteServer.register(hostname, port, this)
|
||||||
val pipelineFactory = new RemoteServerPipelineFactory(
|
|
||||||
name, openChannels, loader, this)
|
val pipelineFactory = new RemoteServerPipelineFactory(name, openChannels, loader, this)
|
||||||
bootstrap.setPipelineFactory(pipelineFactory)
|
bootstrap.setPipelineFactory(pipelineFactory)
|
||||||
bootstrap.setOption("child.tcpNoDelay", true)
|
bootstrap.setOption("child.tcpNoDelay", true)
|
||||||
bootstrap.setOption("child.keepAlive", true)
|
bootstrap.setOption("child.keepAlive", true)
|
||||||
bootstrap.setOption("child.reuseAddress", true)
|
bootstrap.setOption("child.reuseAddress", true)
|
||||||
bootstrap.setOption("child.connectTimeoutMillis", RemoteServer.CONNECTION_TIMEOUT_MILLIS.toMillis)
|
bootstrap.setOption("child.connectTimeoutMillis", RemoteServer.CONNECTION_TIMEOUT_MILLIS.toMillis)
|
||||||
|
|
||||||
openChannels.add(bootstrap.bind(new InetSocketAddress(hostname, port)))
|
openChannels.add(bootstrap.bind(new InetSocketAddress(hostname, port)))
|
||||||
_isRunning = true
|
_isRunning = true
|
||||||
Cluster.registerLocalNode(hostname, port)
|
Cluster.registerLocalNode(hostname, port)
|
||||||
|
|
@ -260,11 +261,8 @@ class RemoteServer extends Logging with ListenerManagement {
|
||||||
*/
|
*/
|
||||||
def registerTypedActor(id: String, typedActor: AnyRef): Unit = synchronized {
|
def registerTypedActor(id: String, typedActor: AnyRef): Unit = synchronized {
|
||||||
log.debug("Registering server side remote typed actor [%s] with id [%s]", typedActor.getClass.getName, id)
|
log.debug("Registering server side remote typed actor [%s] with id [%s]", typedActor.getClass.getName, id)
|
||||||
if (id.startsWith(UUID_PREFIX)) {
|
if (id.startsWith(UUID_PREFIX)) registerTypedActor(id.substring(UUID_PREFIX.length), typedActor, typedActorsByUuid)
|
||||||
registerTypedActor(id.substring(UUID_PREFIX.length), typedActor, typedActorsByUuid())
|
else registerTypedActor(id, typedActor, typedActors)
|
||||||
} else {
|
|
||||||
registerTypedActor(id, typedActor, typedActors())
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -279,28 +277,19 @@ class RemoteServer extends Logging with ListenerManagement {
|
||||||
*/
|
*/
|
||||||
def register(id: String, actorRef: ActorRef): Unit = synchronized {
|
def register(id: String, actorRef: ActorRef): Unit = synchronized {
|
||||||
log.debug("Registering server side remote actor [%s] with id [%s]", actorRef.actorClass.getName, id)
|
log.debug("Registering server side remote actor [%s] with id [%s]", actorRef.actorClass.getName, id)
|
||||||
if (id.startsWith(UUID_PREFIX)) {
|
if (id.startsWith(UUID_PREFIX)) register(id.substring(UUID_PREFIX.length), actorRef, actorsByUuid)
|
||||||
register(id.substring(UUID_PREFIX.length), actorRef, actorsByUuid())
|
else register(id, actorRef, actors)
|
||||||
} else {
|
|
||||||
register(id, actorRef, actors())
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private def register[Key](id: Key, actorRef: ActorRef, registry: ConcurrentHashMap[Key, ActorRef]) {
|
private def register[Key](id: Key, actorRef: ActorRef, registry: ConcurrentHashMap[Key, ActorRef]) {
|
||||||
if (_isRunning) {
|
if (_isRunning && !registry.contains(id)) {
|
||||||
if (!registry.contains(id)) {
|
|
||||||
if (!actorRef.isRunning) actorRef.start
|
if (!actorRef.isRunning) actorRef.start
|
||||||
registry.put(id, actorRef)
|
registry.put(id, actorRef)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
private def registerTypedActor[Key](id: Key, typedActor: AnyRef, registry: ConcurrentHashMap[Key, AnyRef]) {
|
private def registerTypedActor[Key](id: Key, typedActor: AnyRef, registry: ConcurrentHashMap[Key, AnyRef]) {
|
||||||
if (_isRunning) {
|
if (_isRunning && !registry.contains(id)) registry.put(id, typedActor)
|
||||||
if (!registry.contains(id)) {
|
|
||||||
registry.put(id, typedActor)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -309,8 +298,8 @@ class RemoteServer extends Logging with ListenerManagement {
|
||||||
def unregister(actorRef: ActorRef):Unit = synchronized {
|
def unregister(actorRef: ActorRef):Unit = synchronized {
|
||||||
if (_isRunning) {
|
if (_isRunning) {
|
||||||
log.debug("Unregistering server side remote actor [%s] with id [%s:%s]", actorRef.actorClass.getName, actorRef.id, actorRef.uuid)
|
log.debug("Unregistering server side remote actor [%s] with id [%s:%s]", actorRef.actorClass.getName, actorRef.id, actorRef.uuid)
|
||||||
actors().remove(actorRef.id,actorRef)
|
actors.remove(actorRef.id, actorRef)
|
||||||
actorsByUuid().remove(actorRef.uuid,actorRef)
|
actorsByUuid.remove(actorRef.uuid, actorRef)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -322,12 +311,11 @@ class RemoteServer extends Logging with ListenerManagement {
|
||||||
def unregister(id: String):Unit = synchronized {
|
def unregister(id: String):Unit = synchronized {
|
||||||
if (_isRunning) {
|
if (_isRunning) {
|
||||||
log.info("Unregistering server side remote actor with id [%s]", id)
|
log.info("Unregistering server side remote actor with id [%s]", id)
|
||||||
if (id.startsWith(UUID_PREFIX)) {
|
if (id.startsWith(UUID_PREFIX)) actorsByUuid.remove(id.substring(UUID_PREFIX.length))
|
||||||
actorsByUuid().remove(id.substring(UUID_PREFIX.length))
|
else {
|
||||||
} else {
|
val actorRef = actors get id
|
||||||
val actorRef = actors() get id
|
actorsByUuid.remove(actorRef.uuid, actorRef)
|
||||||
actorsByUuid().remove(actorRef.uuid,actorRef)
|
actors.remove(id,actorRef)
|
||||||
actors().remove(id,actorRef)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -340,11 +328,8 @@ class RemoteServer extends Logging with ListenerManagement {
|
||||||
def unregisterTypedActor(id: String):Unit = synchronized {
|
def unregisterTypedActor(id: String):Unit = synchronized {
|
||||||
if (_isRunning) {
|
if (_isRunning) {
|
||||||
log.info("Unregistering server side remote typed actor with id [%s]", id)
|
log.info("Unregistering server side remote typed actor with id [%s]", id)
|
||||||
if (id.startsWith(UUID_PREFIX)) {
|
if (id.startsWith(UUID_PREFIX)) typedActorsByUuid.remove(id.substring(UUID_PREFIX.length))
|
||||||
typedActorsByUuid().remove(id.substring(UUID_PREFIX.length))
|
else typedActors.remove(id)
|
||||||
} else {
|
|
||||||
typedActors().remove(id)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -352,10 +337,10 @@ class RemoteServer extends Logging with ListenerManagement {
|
||||||
|
|
||||||
protected[akka] override def notifyListeners(message: => Any): Unit = super.notifyListeners(message)
|
protected[akka] override def notifyListeners(message: => Any): Unit = super.notifyListeners(message)
|
||||||
|
|
||||||
private[akka] def actors() = ActorRegistry.actors(address)
|
private[akka] def actors = ActorRegistry.actors(address)
|
||||||
private[akka] def actorsByUuid() = ActorRegistry.actorsByUuid(address)
|
private[akka] def actorsByUuid = ActorRegistry.actorsByUuid(address)
|
||||||
private[akka] def typedActors() = ActorRegistry.typedActors(address)
|
private[akka] def typedActors = ActorRegistry.typedActors(address)
|
||||||
private[akka] def typedActorsByUuid() = ActorRegistry.typedActorsByUuid(address)
|
private[akka] def typedActorsByUuid = ActorRegistry.typedActorsByUuid(address)
|
||||||
}
|
}
|
||||||
|
|
||||||
object RemoteServerSslContext {
|
object RemoteServerSslContext {
|
||||||
|
|
@ -419,6 +404,7 @@ class RemoteServerHandler(
|
||||||
val applicationLoader: Option[ClassLoader],
|
val applicationLoader: Option[ClassLoader],
|
||||||
val server: RemoteServer) extends SimpleChannelUpstreamHandler with Logging {
|
val server: RemoteServer) extends SimpleChannelUpstreamHandler with Logging {
|
||||||
import RemoteServer._
|
import RemoteServer._
|
||||||
|
|
||||||
val AW_PROXY_PREFIX = "$$ProxiedByAW".intern
|
val AW_PROXY_PREFIX = "$$ProxiedByAW".intern
|
||||||
val CHANNEL_INIT = "channel-init".intern
|
val CHANNEL_INIT = "channel-init".intern
|
||||||
|
|
||||||
|
|
@ -444,10 +430,8 @@ class RemoteServerHandler(
|
||||||
} else future.getChannel.close
|
} else future.getChannel.close
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
} else {
|
} else server.notifyListeners(RemoteServerClientConnected(server))
|
||||||
server.notifyListeners(RemoteServerClientConnected(server))
|
if (RemoteServer.REQUIRE_COOKIE) ctx.setAttachment(CHANNEL_INIT) // signal that this is channel initialization, which will need authentication
|
||||||
}
|
|
||||||
if (RemoteServer.REQUIRE_COOKIE) ctx.setAttachment(CHANNEL_INIT)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
override def channelClosed(ctx: ChannelHandlerContext, event: ChannelStateEvent) = {
|
override def channelClosed(ctx: ChannelHandlerContext, event: ChannelStateEvent) = {
|
||||||
|
|
@ -467,7 +451,7 @@ class RemoteServerHandler(
|
||||||
if (message eq null) throw new IllegalActorStateException("Message in remote MessageEvent is null: " + event)
|
if (message eq null) throw new IllegalActorStateException("Message in remote MessageEvent is null: " + event)
|
||||||
if (message.isInstanceOf[RemoteRequestProtocol]) {
|
if (message.isInstanceOf[RemoteRequestProtocol]) {
|
||||||
val requestProtocol = message.asInstanceOf[RemoteRequestProtocol]
|
val requestProtocol = message.asInstanceOf[RemoteRequestProtocol]
|
||||||
authenticateRemoteClient(requestProtocol, ctx)
|
if (RemoteServer.REQUIRE_COOKIE) authenticateRemoteClient(requestProtocol, ctx)
|
||||||
handleRemoteRequestProtocol(requestProtocol, event.getChannel)
|
handleRemoteRequestProtocol(requestProtocol, event.getChannel)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -521,8 +505,7 @@ class RemoteServerHandler(
|
||||||
try {
|
try {
|
||||||
channel.write(replyBuilder.build)
|
channel.write(replyBuilder.build)
|
||||||
} catch {
|
} catch {
|
||||||
case e: Throwable =>
|
case e: Throwable => server.notifyListeners(RemoteServerError(e, server))
|
||||||
server.notifyListeners(RemoteServerError(e, server))
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -530,8 +513,7 @@ class RemoteServerHandler(
|
||||||
try {
|
try {
|
||||||
channel.write(createErrorReplyMessage(exception, request, true))
|
channel.write(createErrorReplyMessage(exception, request, true))
|
||||||
} catch {
|
} catch {
|
||||||
case e: Throwable =>
|
case e: Throwable => server.notifyListeners(RemoteServerError(e, server))
|
||||||
server.notifyListeners(RemoteServerError(e, server))
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -543,8 +525,8 @@ class RemoteServerHandler(
|
||||||
val actorInfo = request.getActorInfo
|
val actorInfo = request.getActorInfo
|
||||||
val typedActorInfo = actorInfo.getTypedActorInfo
|
val typedActorInfo = actorInfo.getTypedActorInfo
|
||||||
log.debug("Dispatching to remote typed actor [%s :: %s]", typedActorInfo.getMethod, typedActorInfo.getInterface)
|
log.debug("Dispatching to remote typed actor [%s :: %s]", typedActorInfo.getMethod, typedActorInfo.getInterface)
|
||||||
val typedActor = createTypedActor(actorInfo)
|
|
||||||
|
|
||||||
|
val typedActor = createTypedActor(actorInfo)
|
||||||
val args = MessageSerializer.deserialize(request.getMessage).asInstanceOf[Array[AnyRef]].toList
|
val args = MessageSerializer.deserialize(request.getMessage).asInstanceOf[Array[AnyRef]].toList
|
||||||
val argClasses = args.map(_.getClass)
|
val argClasses = args.map(_.getClass)
|
||||||
|
|
||||||
|
|
@ -573,42 +555,32 @@ class RemoteServerHandler(
|
||||||
}
|
}
|
||||||
|
|
||||||
private def findActorById(id: String) : ActorRef = {
|
private def findActorById(id: String) : ActorRef = {
|
||||||
server.actors().get(id)
|
server.actors.get(id)
|
||||||
}
|
}
|
||||||
|
|
||||||
private def findActorByUuid(uuid: String) : ActorRef = {
|
private def findActorByUuid(uuid: String) : ActorRef = {
|
||||||
server.actorsByUuid().get(uuid)
|
server.actorsByUuid.get(uuid)
|
||||||
}
|
}
|
||||||
|
|
||||||
private def findTypedActorById(id: String) : AnyRef = {
|
private def findTypedActorById(id: String) : AnyRef = {
|
||||||
server.typedActors().get(id)
|
server.typedActors.get(id)
|
||||||
}
|
}
|
||||||
|
|
||||||
private def findTypedActorByUuid(uuid: String) : AnyRef = {
|
private def findTypedActorByUuid(uuid: String) : AnyRef = {
|
||||||
server.typedActorsByUuid().get(uuid)
|
server.typedActorsByUuid.get(uuid)
|
||||||
}
|
}
|
||||||
|
|
||||||
private def findActorByIdOrUuid(id: String, uuid: String) : ActorRef = {
|
private def findActorByIdOrUuid(id: String, uuid: String) : ActorRef = {
|
||||||
var actorRefOrNull = if (id.startsWith(UUID_PREFIX)) {
|
var actorRefOrNull = if (id.startsWith(UUID_PREFIX)) findActorByUuid(id.substring(UUID_PREFIX.length))
|
||||||
findActorByUuid(id.substring(UUID_PREFIX.length))
|
else findActorById(id)
|
||||||
} else {
|
if (actorRefOrNull eq null) actorRefOrNull = findActorByUuid(uuid)
|
||||||
findActorById(id)
|
|
||||||
}
|
|
||||||
if (actorRefOrNull eq null) {
|
|
||||||
actorRefOrNull = findActorByUuid(uuid)
|
|
||||||
}
|
|
||||||
actorRefOrNull
|
actorRefOrNull
|
||||||
}
|
}
|
||||||
|
|
||||||
private def findTypedActorByIdOrUuid(id: String, uuid: String) : AnyRef = {
|
private def findTypedActorByIdOrUuid(id: String, uuid: String) : AnyRef = {
|
||||||
var actorRefOrNull = if (id.startsWith(UUID_PREFIX)) {
|
var actorRefOrNull = if (id.startsWith(UUID_PREFIX)) findTypedActorByUuid(id.substring(UUID_PREFIX.length))
|
||||||
findTypedActorByUuid(id.substring(UUID_PREFIX.length))
|
else findTypedActorById(id)
|
||||||
} else {
|
if (actorRefOrNull eq null) actorRefOrNull = findTypedActorByUuid(uuid)
|
||||||
findTypedActorById(id)
|
|
||||||
}
|
|
||||||
if (actorRefOrNull eq null) {
|
|
||||||
actorRefOrNull = findTypedActorByUuid(uuid)
|
|
||||||
}
|
|
||||||
actorRefOrNull
|
actorRefOrNull
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -694,11 +666,11 @@ class RemoteServerHandler(
|
||||||
}
|
}
|
||||||
|
|
||||||
private def authenticateRemoteClient(request: RemoteRequestProtocol, ctx: ChannelHandlerContext) = {
|
private def authenticateRemoteClient(request: RemoteRequestProtocol, ctx: ChannelHandlerContext) = {
|
||||||
if (RemoteServer.REQUIRE_COOKIE) {
|
|
||||||
val attachment = ctx.getAttachment
|
val attachment = ctx.getAttachment
|
||||||
if ((attachment ne null) &&
|
if ((attachment ne null) &&
|
||||||
attachment.isInstanceOf[String] &&
|
attachment.isInstanceOf[String] &&
|
||||||
attachment.asInstanceOf[String] == CHANNEL_INIT) {
|
attachment.asInstanceOf[String] == CHANNEL_INIT) { // is first time around, channel initialization
|
||||||
|
ctx.setAttachment(null)
|
||||||
val clientAddress = ctx.getChannel.getRemoteAddress.toString
|
val clientAddress = ctx.getChannel.getRemoteAddress.toString
|
||||||
if (!request.hasCookie) throw new SecurityException(
|
if (!request.hasCookie) throw new SecurityException(
|
||||||
"The remote client [" + clientAddress + "] does not have a secure cookie.")
|
"The remote client [" + clientAddress + "] does not have a secure cookie.")
|
||||||
|
|
@ -708,4 +680,3 @@ class RemoteServerHandler(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
|
||||||
|
|
@ -263,8 +263,7 @@ object RemoteActorSerialization {
|
||||||
senderOption: Option[ActorRef],
|
senderOption: Option[ActorRef],
|
||||||
typedActorInfo: Option[Tuple2[String, String]],
|
typedActorInfo: Option[Tuple2[String, String]],
|
||||||
actorType: ActorType,
|
actorType: ActorType,
|
||||||
secureCookie: Option[String]):
|
secureCookie: Option[String]): RemoteRequestProtocol.Builder = {
|
||||||
RemoteRequestProtocol.Builder = {
|
|
||||||
import actorRef._
|
import actorRef._
|
||||||
|
|
||||||
val actorInfoBuilder = ActorInfoProtocol.newBuilder
|
val actorInfoBuilder = ActorInfoProtocol.newBuilder
|
||||||
|
|
@ -273,8 +272,7 @@ object RemoteActorSerialization {
|
||||||
.setTarget(actorClassName)
|
.setTarget(actorClassName)
|
||||||
.setTimeout(timeout)
|
.setTimeout(timeout)
|
||||||
|
|
||||||
typedActorInfo.foreach {
|
typedActorInfo.foreach { typedActor =>
|
||||||
typedActor =>
|
|
||||||
actorInfoBuilder.setTypedActorInfo(
|
actorInfoBuilder.setTypedActorInfo(
|
||||||
TypedActorInfoProtocol.newBuilder
|
TypedActorInfoProtocol.newBuilder
|
||||||
.setInterface(typedActor._1)
|
.setInterface(typedActor._1)
|
||||||
|
|
@ -310,8 +308,6 @@ object RemoteActorSerialization {
|
||||||
}
|
}
|
||||||
requestBuilder
|
requestBuilder
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
@ -408,5 +404,4 @@ object RemoteTypedActorSerialization {
|
||||||
.setInterfaceName(init.interfaceClass.getName)
|
.setInterfaceName(init.interfaceClass.getName)
|
||||||
.build
|
.build
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -201,18 +201,18 @@ class ServerInitiatedRemoteActorSpec extends JUnitSuite {
|
||||||
def shouldRegisterAndUnregister {
|
def shouldRegisterAndUnregister {
|
||||||
val actor1 = actorOf[RemoteActorSpecActorUnidirectional]
|
val actor1 = actorOf[RemoteActorSpecActorUnidirectional]
|
||||||
server.register("my-service-1", actor1)
|
server.register("my-service-1", actor1)
|
||||||
assert(server.actors().get("my-service-1") ne null, "actor registered")
|
assert(server.actors.get("my-service-1") ne null, "actor registered")
|
||||||
server.unregister("my-service-1")
|
server.unregister("my-service-1")
|
||||||
assert(server.actors().get("my-service-1") eq null, "actor unregistered")
|
assert(server.actors.get("my-service-1") eq null, "actor unregistered")
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
def shouldRegisterAndUnregisterByUuid {
|
def shouldRegisterAndUnregisterByUuid {
|
||||||
val actor1 = actorOf[RemoteActorSpecActorUnidirectional]
|
val actor1 = actorOf[RemoteActorSpecActorUnidirectional]
|
||||||
server.register("uuid:" + actor1.uuid, actor1)
|
server.register("uuid:" + actor1.uuid, actor1)
|
||||||
assert(server.actorsByUuid().get(actor1.uuid.toString) ne null, "actor registered")
|
assert(server.actorsByUuid.get(actor1.uuid.toString) ne null, "actor registered")
|
||||||
server.unregister("uuid:" + actor1.uuid)
|
server.unregister("uuid:" + actor1.uuid)
|
||||||
assert(server.actorsByUuid().get(actor1.uuid) eq null, "actor unregistered")
|
assert(server.actorsByUuid.get(actor1.uuid) eq null, "actor unregistered")
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -103,9 +103,9 @@ class ServerInitiatedRemoteTypedActorSpec extends
|
||||||
it("should register and unregister typed actors") {
|
it("should register and unregister typed actors") {
|
||||||
val typedActor = TypedActor.newInstance(classOf[RemoteTypedActorOne], classOf[RemoteTypedActorOneImpl], 1000)
|
val typedActor = TypedActor.newInstance(classOf[RemoteTypedActorOne], classOf[RemoteTypedActorOneImpl], 1000)
|
||||||
server.registerTypedActor("my-test-service", typedActor)
|
server.registerTypedActor("my-test-service", typedActor)
|
||||||
assert(server.typedActors().get("my-test-service") ne null, "typed actor registered")
|
assert(server.typedActors.get("my-test-service") ne null, "typed actor registered")
|
||||||
server.unregisterTypedActor("my-test-service")
|
server.unregisterTypedActor("my-test-service")
|
||||||
assert(server.typedActors().get("my-test-service") eq null, "typed actor unregistered")
|
assert(server.typedActors.get("my-test-service") eq null, "typed actor unregistered")
|
||||||
}
|
}
|
||||||
|
|
||||||
it("should register and unregister typed actors by uuid") {
|
it("should register and unregister typed actors by uuid") {
|
||||||
|
|
@ -113,9 +113,9 @@ class ServerInitiatedRemoteTypedActorSpec extends
|
||||||
val init = AspectInitRegistry.initFor(typedActor)
|
val init = AspectInitRegistry.initFor(typedActor)
|
||||||
val uuid = "uuid:" + init.actorRef.uuid
|
val uuid = "uuid:" + init.actorRef.uuid
|
||||||
server.registerTypedActor(uuid, typedActor)
|
server.registerTypedActor(uuid, typedActor)
|
||||||
assert(server.typedActorsByUuid().get(init.actorRef.uuid.toString) ne null, "typed actor registered")
|
assert(server.typedActorsByUuid.get(init.actorRef.uuid.toString) ne null, "typed actor registered")
|
||||||
server.unregisterTypedActor(uuid)
|
server.unregisterTypedActor(uuid)
|
||||||
assert(server.typedActorsByUuid().get(init.actorRef.uuid.toString) eq null, "typed actor unregistered")
|
assert(server.typedActorsByUuid.get(init.actorRef.uuid.toString) eq null, "typed actor unregistered")
|
||||||
}
|
}
|
||||||
|
|
||||||
it("should find typed actors by uuid") {
|
it("should find typed actors by uuid") {
|
||||||
|
|
@ -123,7 +123,7 @@ class ServerInitiatedRemoteTypedActorSpec extends
|
||||||
val init = AspectInitRegistry.initFor(typedActor)
|
val init = AspectInitRegistry.initFor(typedActor)
|
||||||
val uuid = "uuid:" + init.actorRef.uuid
|
val uuid = "uuid:" + init.actorRef.uuid
|
||||||
server.registerTypedActor(uuid, typedActor)
|
server.registerTypedActor(uuid, typedActor)
|
||||||
assert(server.typedActorsByUuid().get(init.actorRef.uuid.toString) ne null, "typed actor registered")
|
assert(server.typedActorsByUuid.get(init.actorRef.uuid.toString) ne null, "typed actor registered")
|
||||||
|
|
||||||
val actor = RemoteClient.typedActorFor(classOf[RemoteTypedActorOne], uuid, HOSTNAME, PORT)
|
val actor = RemoteClient.typedActorFor(classOf[RemoteTypedActorOne], uuid, HOSTNAME, PORT)
|
||||||
expect("oneway") {
|
expect("oneway") {
|
||||||
|
|
|
||||||
|
|
@ -4,7 +4,7 @@ object AkkaRepositories {
|
||||||
val AkkaRepo = MavenRepository("Akka Repository", "http://scalablesolutions.se/akka/repository")
|
val AkkaRepo = MavenRepository("Akka Repository", "http://scalablesolutions.se/akka/repository")
|
||||||
val CodehausRepo = MavenRepository("Codehaus Repo", "http://repository.codehaus.org")
|
val CodehausRepo = MavenRepository("Codehaus Repo", "http://repository.codehaus.org")
|
||||||
val GuiceyFruitRepo = MavenRepository("GuiceyFruit Repo", "http://guiceyfruit.googlecode.com/svn/repo/releases/")
|
val GuiceyFruitRepo = MavenRepository("GuiceyFruit Repo", "http://guiceyfruit.googlecode.com/svn/repo/releases/")
|
||||||
val JBossRepo = MavenRepository("JBoss Repo", "https://repository.jboss.org/nexus/content/groups/public/")
|
val JBossRepo = MavenRepository("JBoss Repo", "http://repository.jboss.org/nexus/content/groups/public/")
|
||||||
val JavaNetRepo = MavenRepository("java.net Repo", "http://download.java.net/maven/2")
|
val JavaNetRepo = MavenRepository("java.net Repo", "http://download.java.net/maven/2")
|
||||||
val SonatypeSnapshotRepo = MavenRepository("Sonatype OSS Repo", "http://oss.sonatype.org/content/repositories/releases")
|
val SonatypeSnapshotRepo = MavenRepository("Sonatype OSS Repo", "http://oss.sonatype.org/content/repositories/releases")
|
||||||
val SunJDMKRepo = MavenRepository("Sun JDMK Repo", "http://wp5.e-taxonomy.eu/cdmlib/mavenrepo")
|
val SunJDMKRepo = MavenRepository("Sun JDMK Repo", "http://wp5.e-taxonomy.eu/cdmlib/mavenrepo")
|
||||||
|
|
|
||||||
|
|
@ -72,7 +72,7 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) {
|
||||||
lazy val EmbeddedRepo = MavenRepository("Embedded Repo", (info.projectPath / "embedded-repo").asURL.toString)
|
lazy val EmbeddedRepo = MavenRepository("Embedded Repo", (info.projectPath / "embedded-repo").asURL.toString)
|
||||||
lazy val FusesourceSnapshotRepo = MavenRepository("Fusesource Snapshots", "http://repo.fusesource.com/nexus/content/repositories/snapshots")
|
lazy val FusesourceSnapshotRepo = MavenRepository("Fusesource Snapshots", "http://repo.fusesource.com/nexus/content/repositories/snapshots")
|
||||||
lazy val GuiceyFruitRepo = MavenRepository("GuiceyFruit Repo", "http://guiceyfruit.googlecode.com/svn/repo/releases/")
|
lazy val GuiceyFruitRepo = MavenRepository("GuiceyFruit Repo", "http://guiceyfruit.googlecode.com/svn/repo/releases/")
|
||||||
lazy val JBossRepo = MavenRepository("JBoss Repo", "https://repository.jboss.org/nexus/content/groups/public/")
|
lazy val JBossRepo = MavenRepository("JBoss Repo", "http://repository.jboss.org/nexus/content/groups/public/")
|
||||||
lazy val JavaNetRepo = MavenRepository("java.net Repo", "http://download.java.net/maven/2")
|
lazy val JavaNetRepo = MavenRepository("java.net Repo", "http://download.java.net/maven/2")
|
||||||
lazy val SonatypeSnapshotRepo = MavenRepository("Sonatype OSS Repo", "http://oss.sonatype.org/content/repositories/releases")
|
lazy val SonatypeSnapshotRepo = MavenRepository("Sonatype OSS Repo", "http://oss.sonatype.org/content/repositories/releases")
|
||||||
lazy val SunJDMKRepo = MavenRepository("Sun JDMK Repo", "http://wp5.e-taxonomy.eu/cdmlib/mavenrepo")
|
lazy val SunJDMKRepo = MavenRepository("Sun JDMK Repo", "http://wp5.e-taxonomy.eu/cdmlib/mavenrepo")
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue