Added Erlang-style secure cookie authentication for remote client/server
This commit is contained in:
parent
00feb8a68c
commit
cbc10111af
10 changed files with 262 additions and 69 deletions
|
|
@ -16,6 +16,7 @@ import se.scalablesolutions.akka.util._
|
|||
import se.scalablesolutions.akka.remote.protocol.RemoteProtocol._
|
||||
import se.scalablesolutions.akka.remote.protocol.RemoteProtocol.ActorType._
|
||||
import se.scalablesolutions.akka.config.Config._
|
||||
import se.scalablesolutions.akka.config.ConfigurationException
|
||||
import se.scalablesolutions.akka.dispatch.{DefaultCompletableFuture, CompletableFuture}
|
||||
import se.scalablesolutions.akka.serialization.RemoteActorSerialization
|
||||
import se.scalablesolutions.akka.serialization.RemoteActorSerialization._
|
||||
|
|
@ -61,21 +62,30 @@ import scala.reflect.BeanProperty
|
|||
object RemoteNode extends RemoteServer
|
||||
|
||||
/**
|
||||
* For internal use only.
|
||||
* Holds configuration variables, remote actors, remote typed actors and remote servers.
|
||||
* For internal use only. Holds configuration variables, remote actors, remote typed actors and remote servers.
|
||||
*
|
||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
*/
|
||||
object
|
||||
RemoteServer {
|
||||
object RemoteServer {
|
||||
val UUID_PREFIX = "uuid:"
|
||||
val HOSTNAME = config.getString("akka.remote.server.hostname", "localhost")
|
||||
val PORT = config.getInt("akka.remote.server.port", 9999)
|
||||
|
||||
val SECURE_COOKIE: Option[String] = {
|
||||
val cookie = config.getString("akka.remote.secure-cookie", "")
|
||||
if (cookie == "") None
|
||||
else Some(cookie)
|
||||
}
|
||||
val REQUIRE_COOKIE = {
|
||||
val requireCookie = config.getBool("akka.remote.server.require-cookie", true)
|
||||
if (RemoteServer.SECURE_COOKIE.isEmpty) throw new ConfigurationException(
|
||||
"Configuration option 'akka.remote.server.require-cookie' is turned on but no secure cookie is defined in 'akka.remote.secure-cookie'.")
|
||||
requireCookie
|
||||
}
|
||||
|
||||
val HOSTNAME = config.getString("akka.remote.server.hostname", "localhost")
|
||||
val PORT = config.getInt("akka.remote.server.port", 9999)
|
||||
val CONNECTION_TIMEOUT_MILLIS = Duration(config.getInt("akka.remote.server.connection-timeout", 1), TIME_UNIT)
|
||||
|
||||
val COMPRESSION_SCHEME = config.getString("akka.remote.compression-scheme", "zlib")
|
||||
val ZLIB_COMPRESSION_LEVEL = {
|
||||
val COMPRESSION_SCHEME = config.getString("akka.remote.compression-scheme", "zlib")
|
||||
val ZLIB_COMPRESSION_LEVEL = {
|
||||
val level = config.getInt("akka.remote.zlib-compression-level", 6)
|
||||
if (level < 1 && level > 9) throw new IllegalArgumentException(
|
||||
"zlib compression level has to be within 1-9, with 1 being fastest and 9 being the most compressed")
|
||||
|
|
@ -128,7 +138,6 @@ RemoteServer {
|
|||
private[akka] def unregister(hostname: String, port: Int) = guard.withWriteGuard {
|
||||
remoteServers.remove(Address(hostname, port))
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -389,7 +398,7 @@ class RemoteServerPipelineFactory(
|
|||
val lenPrep = new LengthFieldPrepender(4)
|
||||
val protobufDec = new ProtobufDecoder(RemoteRequestProtocol.getDefaultInstance)
|
||||
val protobufEnc = new ProtobufEncoder
|
||||
val (enc,dec) = RemoteServer.COMPRESSION_SCHEME match {
|
||||
val (enc, dec) = RemoteServer.COMPRESSION_SCHEME match {
|
||||
case "zlib" => (join(new ZlibEncoder(RemoteServer.ZLIB_COMPRESSION_LEVEL)), join(new ZlibDecoder))
|
||||
case _ => (join(), join())
|
||||
}
|
||||
|
|
@ -411,6 +420,7 @@ class RemoteServerHandler(
|
|||
val server: RemoteServer) extends SimpleChannelUpstreamHandler with Logging {
|
||||
import RemoteServer._
|
||||
val AW_PROXY_PREFIX = "$$ProxiedByAW".intern
|
||||
val CHANNEL_INIT = "channel-init".intern
|
||||
|
||||
applicationLoader.foreach(MessageSerializer.setClassLoader(_))
|
||||
|
||||
|
|
@ -437,6 +447,7 @@ class RemoteServerHandler(
|
|||
} else {
|
||||
server.notifyListeners(RemoteServerClientConnected(server))
|
||||
}
|
||||
if (RemoteServer.REQUIRE_COOKIE) ctx.setAttachment(CHANNEL_INIT)
|
||||
}
|
||||
|
||||
override def channelClosed(ctx: ChannelHandlerContext, event: ChannelStateEvent) = {
|
||||
|
|
@ -445,8 +456,7 @@ class RemoteServerHandler(
|
|||
}
|
||||
|
||||
override def handleUpstream(ctx: ChannelHandlerContext, event: ChannelEvent) = {
|
||||
if (event.isInstanceOf[ChannelStateEvent] &&
|
||||
event.asInstanceOf[ChannelStateEvent].getState != ChannelState.INTEREST_OPS) {
|
||||
if (event.isInstanceOf[ChannelStateEvent] && event.asInstanceOf[ChannelStateEvent].getState != ChannelState.INTEREST_OPS) {
|
||||
log.debug(event.toString)
|
||||
}
|
||||
super.handleUpstream(ctx, event)
|
||||
|
|
@ -456,7 +466,9 @@ class RemoteServerHandler(
|
|||
val message = event.getMessage
|
||||
if (message eq null) throw new IllegalActorStateException("Message in remote MessageEvent is null: " + event)
|
||||
if (message.isInstanceOf[RemoteRequestProtocol]) {
|
||||
handleRemoteRequestProtocol(message.asInstanceOf[RemoteRequestProtocol], event.getChannel)
|
||||
val requestProtocol = message.asInstanceOf[RemoteRequestProtocol]
|
||||
authenticateRemoteClient(requestProtocol, ctx)
|
||||
handleRemoteRequestProtocol(requestProtocol, event.getChannel)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -491,8 +503,11 @@ class RemoteServerHandler(
|
|||
case RemoteActorSystemMessage.Stop => actorRef.stop
|
||||
case _ => // then match on user defined messages
|
||||
if (request.getIsOneWay) actorRef.!(message)(sender)
|
||||
else actorRef.postMessageToMailboxAndCreateFutureResultWithTimeout(message,request.getActorInfo.getTimeout,None,Some(
|
||||
new DefaultCompletableFuture[AnyRef](request.getActorInfo.getTimeout){
|
||||
else actorRef.postMessageToMailboxAndCreateFutureResultWithTimeout(
|
||||
message,
|
||||
request.getActorInfo.getTimeout,
|
||||
None,
|
||||
Some(new DefaultCompletableFuture[AnyRef](request.getActorInfo.getTimeout){
|
||||
override def onComplete(result: AnyRef) {
|
||||
log.debug("Returning result from actor invocation [%s]", result)
|
||||
val replyBuilder = RemoteReplyProtocol.newBuilder
|
||||
|
|
@ -677,4 +692,20 @@ class RemoteServerHandler(
|
|||
if (request.hasSupervisorUuid) replyBuilder.setSupervisorUuid(request.getSupervisorUuid)
|
||||
replyBuilder.build
|
||||
}
|
||||
|
||||
private def authenticateRemoteClient(request: RemoteRequestProtocol, ctx: ChannelHandlerContext) = {
|
||||
if (RemoteServer.REQUIRE_COOKIE) {
|
||||
val attachment = ctx.getAttachment
|
||||
if ((attachment ne null) &&
|
||||
attachment.isInstanceOf[String] &&
|
||||
attachment.asInstanceOf[String] == CHANNEL_INIT) {
|
||||
val clientAddress = ctx.getChannel.getRemoteAddress.toString
|
||||
if (!request.hasCookie) throw new SecurityException(
|
||||
"The remote client [" + clientAddress + "] does not have a secure cookie.")
|
||||
if (!(request.getCookie == RemoteServer.SECURE_COOKIE.get)) throw new SecurityException(
|
||||
"The remote client [" + clientAddress + "] secure cookie is not the same as remote server secure cookie")
|
||||
log.info("Remote client [%s] successfully authenticated using secure cookie", clientAddress)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue