Adding initial support in the protocol to get the public host/port of the connecting remote server

This commit is contained in:
Viktor Klang 2011-11-03 18:33:57 +01:00
parent 601df0421c
commit 37ba03eadb
7 changed files with 748 additions and 134 deletions

View file

@ -21,10 +21,6 @@ import java.lang.reflect.InvocationTargetException
class RemoteException(message: String) extends AkkaException(message)
trait RemoteService {
def server: RemoteSupport
}
trait RemoteModule {
protected[akka] def notifyListeners(message: Any): Unit
}
@ -174,9 +170,9 @@ trait RemoteClientModule extends RemoteModule { self: RemoteSupport ⇒
/** Methods that needs to be implemented by a transport **/
protected[akka] def send[T](message: Any,
senderOption: Option[ActorRef],
remoteAddress: InetSocketAddress,
recipient: ActorRef,
loader: Option[ClassLoader]): Unit
}
protected[akka] def send(message: Any,
senderOption: Option[ActorRef],
remoteAddress: InetSocketAddress,
recipient: ActorRef,
loader: Option[ClassLoader]): Unit
}

View file

@ -4,15 +4,11 @@
package akka.util
import akka.dispatch.Envelope
import akka.config.ModuleNotAvailableException
import akka.actor._
import DeploymentConfig.ReplicationScheme
import akka.config.ModuleNotAvailableException
import akka.event.EventHandler
import akka.cluster.ClusterNode
import akka.remote.{ RemoteSupport, RemoteService }
import akka.routing.{ RoutedProps, Router }
import java.net.InetSocketAddress
import akka.AkkaApplication
object ReflectiveAccess {

File diff suppressed because it is too large Load diff

View file

@ -31,8 +31,14 @@ message RemoteMessageProtocol {
* Defines some control messages for the remoting
*/
message RemoteControlProtocol {
optional string cookie = 1;
required CommandType commandType = 2;
required CommandType commandType = 1;
optional string cookie = 2;
optional Endpoint origin = 3;
}
message Endpoint {
required string host = 1;
required uint32 port = 2;
}
/**

View file

@ -27,7 +27,7 @@ import akka.serialization.{ JavaSerializer, Serialization, Serializer, Compressi
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class Remote(val app: AkkaApplication) extends RemoteService {
class Remote(val app: AkkaApplication) {
import app._
import app.config

View file

@ -251,9 +251,7 @@ private[akka] case class RemoteActorRef private[akka] (
protected[akka] def sendSystemMessage(message: SystemMessage): Unit = unsupported
def postMessageToMailbox(message: Any, sender: ActorRef) {
remote.send[Any](message, Option(sender), remoteAddress, this, loader)
}
def postMessageToMailbox(message: Any, sender: ActorRef): Unit = remote.send(message, Option(sender), remoteAddress, this, loader)
def ?(message: Any)(implicit timeout: Timeout): Future[Any] = remote.app.provider.ask(message, this, timeout)
@ -265,7 +263,7 @@ private[akka] case class RemoteActorRef private[akka] (
synchronized {
if (running) {
running = false
remote.send[Any](new Terminate(), None, remoteAddress, this, loader)
remote.send(new Terminate(), None, remoteAddress, this, loader)
}
}
}

View file

@ -40,12 +40,12 @@ trait NettyRemoteClientModule extends RemoteClientModule {
def app: AkkaApplication
protected[akka] def send[T](message: Any,
senderOption: Option[ActorRef],
recipientAddress: InetSocketAddress,
recipient: ActorRef,
loader: Option[ClassLoader]): Unit =
withClientFor(recipientAddress, loader) { _.send[T](message, senderOption, recipient) }
protected[akka] def send(message: Any,
senderOption: Option[ActorRef],
recipientAddress: InetSocketAddress,
recipient: ActorRef,
loader: Option[ClassLoader]): Unit =
withClientFor(recipientAddress, loader) { _.send(message, senderOption, recipient) }
private[akka] def withClientFor[T](
address: InetSocketAddress, loader: Option[ClassLoader])(body: RemoteClient T): T = {
@ -140,14 +140,14 @@ abstract class RemoteClient private[akka] (
/**
* Converts the message to the wireprotocol and sends the message across the wire
*/
def send[T](message: Any, senderOption: Option[ActorRef], recipient: ActorRef) {
def send(message: Any, senderOption: Option[ActorRef], recipient: ActorRef) {
send(createRemoteMessageProtocolBuilder(Left(recipient), Right(message), senderOption).build)
}
/**
* Sends the message across the wire
*/
def send[T](request: RemoteMessageProtocol) {
def send(request: RemoteMessageProtocol) {
if (isRunning) { //TODO FIXME RACY
app.eventHandler.debug(this, "Sending to connection [%s] message [%s]".format(remoteAddress, new RemoteMessage(request, remoteSupport)))
@ -210,6 +210,7 @@ class ActiveRemoteClient private[akka] (
def sendSecureCookie(connection: ChannelFuture) {
val handshake = RemoteControlProtocol.newBuilder.setCommandType(CommandType.CONNECT)
if (SECURE_COOKIE.nonEmpty) handshake.setCookie(SECURE_COOKIE.get)
handshake.setOrigin(RemoteProtocol.Endpoint.newBuilder().setHost(app.hostname).setPort(app.port).build)
connection.getChannel.write(createControlEnvelope(handshake.build))
}
@ -353,9 +354,7 @@ class ActiveRemoteClientHandler(
case arp: AkkaRemoteProtocol if arp.hasInstruction
val rcp = arp.getInstruction
rcp.getCommandType match {
case CommandType.SHUTDOWN akka.dispatch.Future {
client.module.shutdownClientConnection(remoteAddress)
}
case CommandType.SHUTDOWN akka.dispatch.Future { client.module.shutdownClientConnection(remoteAddress) }
}
case arp: AkkaRemoteProtocol if arp.hasMessage
@ -570,7 +569,8 @@ class RemoteServerAuthenticationHandler(secureCookie: Option[String]) extends Si
case `authenticated` ctx.sendUpstream(event)
case null event.getMessage match {
case remoteProtocol: AkkaRemoteProtocol if remoteProtocol.hasInstruction
remoteProtocol.getInstruction.getCookie match {
val instruction = remoteProtocol.getInstruction
instruction.getCookie match {
case `cookie`
ctx.setAttachment(authenticated)
ctx.sendUpstream(event)