Moved secure cookie exchange to on connect established, this means I could remove the synchronization on send, enabling muuuch more throughput, also, since the cookie isn`t sent in each message, message size should drop considerably when secure cookie handshakes are enabled. I do however have no way of testing this since it seems like the clustering stuff is totally not working when it comes to the RemoteSupport

This commit is contained in:
Viktor Klang 2011-05-20 19:40:11 +02:00
parent b9a1d49999
commit 41a08237d9
4 changed files with 100 additions and 132 deletions

View file

@ -161,7 +161,6 @@ abstract class RemoteClient private[akka] (
}
private[remote] val runSwitch = new Switch()
private[remote] val isAuthenticated = new AtomicBoolean(false)
private[remote] def isRunning = runSwitch.isOn
@ -196,18 +195,10 @@ abstract class RemoteClient private[akka] (
remoteAddress: InetSocketAddress,
timeout: Long,
isOneWay: Boolean,
actorRef: ActorRef): Option[CompletableFuture[T]] = synchronized { // FIXME: find better strategy to prevent race
actorRef: ActorRef): Option[CompletableFuture[T]] =
send(createRemoteMessageProtocolBuilder(
Some(actorRef),
Left(actorRef.uuid),
actorRef.address,
timeout,
Right(message),
isOneWay,
senderOption,
if (isAuthenticated.compareAndSet(false, true)) RemoteClientSettings.SECURE_COOKIE else None).build, senderFuture)
}
Some(actorRef), Left(actorRef.uuid), actorRef.address, timeout, Right(message), isOneWay, senderOption).build,
senderFuture)
/**
* Sends the message across the wire
@ -342,6 +333,14 @@ class ActiveRemoteClient private[akka] (
notifyListeners(RemoteClientError(connection.getCause, module, remoteAddress))
false
} else {
//Send cookie
val handshake = RemoteControlProtocol.newBuilder.setCommandType(CommandType.CONNECT)
if (SECURE_COOKIE.nonEmpty)
handshake.setCookie(SECURE_COOKIE.get)
connection.getChannel.write(RemoteEncoder.encode(handshake.build))
//Add a task that does GCing of expired Futures
timer.newTimeout(new TimerTask() {
def run(timeout: Timeout) = {
@ -361,7 +360,6 @@ class ActiveRemoteClient private[akka] (
} match {
case true true
case false if reconnectIfAlreadyConnected
isAuthenticated.set(false)
openChannels.remove(connection.getChannel)
connection.getChannel.close
connection = bootstrap.connect(remoteAddress)
@ -369,7 +367,15 @@ class ActiveRemoteClient private[akka] (
if (!connection.isSuccess) {
notifyListeners(RemoteClientError(connection.getCause, module, remoteAddress))
false
} else true
} else {
//Send cookie
val handshake = RemoteControlProtocol.newBuilder.setCommandType(CommandType.CONNECT)
if (SECURE_COOKIE.nonEmpty)
handshake.setCookie(SECURE_COOKIE.get)
connection.getChannel.write(RemoteEncoder.encode(handshake.build))
true
}
case false false
}
}
@ -577,10 +583,10 @@ class NettyRemoteServer(serverModule: NettyRemoteServerModule, val host: String,
def shutdown() {
try {
val shutdownSignal = {
val b = RemoteControlProtocol.newBuilder
val b = RemoteControlProtocol.newBuilder.setCommandType(CommandType.SHUTDOWN)
if (RemoteClientSettings.SECURE_COOKIE.nonEmpty)
b.setCookie(RemoteClientSettings.SECURE_COOKIE.get)
b.setCommandType(CommandType.SHUTDOWN)
b.build
}
openChannels.write(RemoteEncoder.encode(shutdownSignal)).awaitUninterruptibly
@ -736,12 +742,39 @@ class RemoteServerPipelineFactory(
MAX_TOTAL_MEMORY_SIZE,
EXECUTION_POOL_KEEPALIVE.length,
EXECUTION_POOL_KEEPALIVE.unit))
val authenticator = if (REQUIRE_COOKIE) new RemoteServerAuthenticationHandler(SECURE_COOKIE) :: Nil else Nil
val remoteServer = new RemoteServerHandler(name, openChannels, loader, server)
val stages: List[ChannelHandler] = dec ::: lenDec :: protobufDec :: enc ::: lenPrep :: protobufEnc :: execution :: remoteServer :: Nil
val stages: List[ChannelHandler] = dec ::: lenDec :: protobufDec :: enc ::: lenPrep :: protobufEnc :: execution :: authenticator ::: remoteServer :: Nil
new StaticChannelPipeline(stages: _*)
}
}
@ChannelHandler.Sharable
class RemoteServerAuthenticationHandler(secureCookie: Option[String]) extends SimpleChannelUpstreamHandler {
val authenticated = new AnyRef
override def messageReceived(ctx: ChannelHandlerContext, event: MessageEvent) = secureCookie match {
case None ctx.sendUpstream(event)
case Some(cookie)
ctx.getAttachment match {
case `authenticated` ctx.sendUpstream(event)
case null event.getMessage match {
case remoteProtocol: AkkaRemoteProtocol if remoteProtocol.hasInstruction
remoteProtocol.getInstruction.getCookie match {
case `cookie`
ctx.setAttachment(authenticated)
ctx.sendUpstream(event)
case _
throw new SecurityException(
"The remote client [" + ctx.getChannel.getRemoteAddress + "] secure cookie is not the same as remote server secure cookie")
}
case _
throw new SecurityException("The remote client [" + ctx.getChannel.getRemoteAddress + "] is not Authorized!")
}
}
}
}
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
@ -752,7 +785,6 @@ class RemoteServerHandler(
val applicationLoader: Option[ClassLoader],
val server: NettyRemoteServerModule) extends SimpleChannelUpstreamHandler {
import RemoteServerSettings._
val CHANNEL_INIT = "channel-init".intern
applicationLoader.foreach(MessageSerializer.setClassLoader(_)) //TODO: REVISIT: THIS FEELS A BIT DODGY
@ -786,7 +818,6 @@ class RemoteServerHandler(
val clientAddress = getClientAddress(ctx)
sessionActors.set(event.getChannel(), new ConcurrentHashMap[String, ActorRef]())
server.notifyListeners(RemoteServerClientConnected(server, clientAddress))
if (REQUIRE_COOKIE) ctx.setAttachment(CHANNEL_INIT) // signal that this is channel initialization, which will need authentication
}
override def channelDisconnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = {
@ -810,11 +841,8 @@ class RemoteServerHandler(
override def messageReceived(ctx: ChannelHandlerContext, event: MessageEvent) = event.getMessage match {
case null throw new IllegalActorStateException("Message in remote MessageEvent is null: " + event)
//case remoteProtocol: AkkaRemoteProtocol if remoteProtocol.hasInstruction => RemoteServer cannot receive control messages (yet)
case remoteProtocol: AkkaRemoteProtocol if remoteProtocol.hasMessage
val requestProtocol = remoteProtocol.getMessage
if (REQUIRE_COOKIE) authenticateRemoteClient(requestProtocol, ctx)
handleRemoteMessageProtocol(requestProtocol, event.getChannel)
case remote: AkkaRemoteProtocol if remote.hasMessage handleRemoteMessageProtocol(remote.getMessage, event.getChannel)
//case remote: AkkaRemoteProtocol if remote.hasInstruction => RemoteServer cannot receive control messages (yet)
case _ //ignore
}
@ -874,8 +902,7 @@ class RemoteServerHandler(
actorInfo.getTimeout,
r,
true,
Some(actorRef),
None)
Some(actorRef))
// FIXME lift in the supervisor uuid management into toh createRemoteMessageProtocolBuilder method
if (request.hasSupervisorUuid) messageBuilder.setSupervisorUuid(request.getSupervisorUuid)
@ -939,26 +966,11 @@ class RemoteServerHandler(
actorInfo.getTimeout,
Left(exception),
true,
None,
None)
if (request.hasSupervisorUuid) messageBuilder.setSupervisorUuid(request.getSupervisorUuid)
RemoteEncoder.encode(messageBuilder.build)
}
private def authenticateRemoteClient(request: RemoteMessageProtocol, ctx: ChannelHandlerContext) = {
val attachment = ctx.getAttachment
if ((attachment ne null) &&
attachment.isInstanceOf[String] &&
attachment.asInstanceOf[String] == CHANNEL_INIT) { // is first time around, channel initialization
ctx.setAttachment(null)
val clientAddress = ctx.getChannel.getRemoteAddress.toString
if (!request.hasCookie) throw new SecurityException(
"The remote client [" + clientAddress + "] does not have a secure cookie.")
if (!(request.getCookie == SECURE_COOKIE.get)) throw new SecurityException(
"The remote client [" + clientAddress + "] secure cookie is not the same as remote server secure cookie")
}
}
protected def parseUuid(protocol: UuidProtocol): Uuid = uuidFrom(protocol.getHigh, protocol.getLow)
}