Merge branch 'master' of github.com:jboner/akka

This commit is contained in:
Jonas Bonér 2011-03-14 10:45:55 +01:00
commit 978ec62c1a
14 changed files with 714 additions and 169 deletions

View file

@ -361,28 +361,18 @@ class ActiveRemoteClientPipelineFactory(
client: ActiveRemoteClient) extends ChannelPipelineFactory {
def getPipeline: ChannelPipeline = {
def join(ch: ChannelHandler*) = Array[ChannelHandler](ch: _*)
lazy val engine = {
val e = RemoteServerSslContext.client.createSSLEngine()
e.setEnabledCipherSuites(e.getSupportedCipherSuites) //TODO is this sensible?
e.setUseClientMode(true)
e
}
val ssl = if (RemoteServerSettings.SECURE) join(new SslHandler(engine)) else join()
val timeout = new ReadTimeoutHandler(timer, RemoteClientSettings.READ_TIMEOUT.toMillis.toInt)
val lenDec = new LengthFieldBasedFrameDecoder(RemoteClientSettings.MESSAGE_FRAME_SIZE, 0, 4, 0, 4)
val lenPrep = new LengthFieldPrepender(4)
val timeout = new ReadTimeoutHandler(timer, RemoteClientSettings.READ_TIMEOUT.toMillis.toInt)
val lenDec = new LengthFieldBasedFrameDecoder(RemoteClientSettings.MESSAGE_FRAME_SIZE, 0, 4, 0, 4)
val lenPrep = new LengthFieldPrepender(4)
val protobufDec = new ProtobufDecoder(AkkaRemoteProtocol.getDefaultInstance)
val protobufEnc = new ProtobufEncoder
val (enc, dec) = RemoteServerSettings.COMPRESSION_SCHEME match {
case "zlib" => (join(new ZlibEncoder(RemoteServerSettings.ZLIB_COMPRESSION_LEVEL)), join(new ZlibDecoder))
case _ => (join(), join())
val (enc, dec) = RemoteServerSettings.COMPRESSION_SCHEME match {
case "zlib" => (new ZlibEncoder(RemoteServerSettings.ZLIB_COMPRESSION_LEVEL) :: Nil, new ZlibDecoder :: Nil)
case _ => (Nil,Nil)
}
val remoteClient = new ActiveRemoteClientHandler(name, futures, supervisors, bootstrap, remoteAddress, timer, client)
val stages = ssl ++ join(timeout) ++ dec ++ join(lenDec, protobufDec) ++ enc ++ join(lenPrep, protobufEnc, remoteClient)
val stages: List[ChannelHandler] = timeout :: dec ::: lenDec :: protobufDec :: enc ::: lenPrep :: protobufEnc :: remoteClient :: Nil
new StaticChannelPipeline(stages: _*)
}
}
@ -465,20 +455,8 @@ class ActiveRemoteClientHandler(
}
override def channelConnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = {
def connect = {
client.notifyListeners(RemoteClientConnected(client.module, client.remoteAddress))
client.resetReconnectionTimeWindow
}
if (RemoteServerSettings.SECURE) {
val sslHandler: SslHandler = ctx.getPipeline.get(classOf[SslHandler])
sslHandler.handshake.addListener(new ChannelFutureListener {
def operationComplete(future: ChannelFuture): Unit = {
if (future.isSuccess) connect
else throw new RemoteClientException("Could not establish SSL handshake", client.module, client.remoteAddress)
}
})
} else connect
client.notifyListeners(RemoteClientConnected(client.module, client.remoteAddress))
client.resetReconnectionTimeWindow
}
override def channelDisconnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = {
@ -533,7 +511,7 @@ class NettyRemoteSupport extends RemoteSupport with NettyRemoteServerModule with
if (optimizeLocalScoped_?) {
val home = this.address
if ((host == home.getHostName || host == home.getAddress.getHostAddress) && port == home.getPort)//TODO: switch to InetSocketAddress.equals?
if ((host == home.getAddress.getHostAddress || host == home.getHostName) && port == home.getPort)//TODO: switch to InetSocketAddress.equals?
return new LocalActorRef(factory, None) // Code is much simpler with return
}
@ -751,21 +729,6 @@ trait NettyRemoteServerModule extends RemoteServerModule { self: RemoteModule =>
if (_isRunning.isOn) typedActorsFactories.remove(id)
}
object RemoteServerSslContext {
import javax.net.ssl.SSLContext
val (client, server) = {
val protocol = "TLS"
//val algorithm = Option(Security.getProperty("ssl.KeyManagerFactory.algorithm")).getOrElse("SunX509")
//val store = KeyStore.getInstance("JKS")
val s = SSLContext.getInstance(protocol)
s.init(null, null, null)
val c = SSLContext.getInstance(protocol)
c.init(null, null, null)
(c, s)
}
}
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
@ -777,27 +740,17 @@ class RemoteServerPipelineFactory(
import RemoteServerSettings._
def getPipeline: ChannelPipeline = {
def join(ch: ChannelHandler*) = Array[ChannelHandler](ch:_*)
lazy val engine = {
val e = RemoteServerSslContext.server.createSSLEngine()
e.setEnabledCipherSuites(e.getSupportedCipherSuites) //TODO is this sensible?
e.setUseClientMode(false)
e
}
val ssl = if(SECURE) join(new SslHandler(engine)) else join()
val lenDec = new LengthFieldBasedFrameDecoder(MESSAGE_FRAME_SIZE, 0, 4, 0, 4)
val lenPrep = new LengthFieldPrepender(4)
val protobufDec = new ProtobufDecoder(AkkaRemoteProtocol.getDefaultInstance)
val protobufEnc = new ProtobufEncoder
val (enc, dec) = COMPRESSION_SCHEME match {
case "zlib" => (join(new ZlibEncoder(ZLIB_COMPRESSION_LEVEL)), join(new ZlibDecoder))
case _ => (join(), join())
case "zlib" => (new ZlibEncoder(ZLIB_COMPRESSION_LEVEL) :: Nil, new ZlibDecoder :: Nil)
case _ => (Nil, Nil)
}
val remoteServer = new RemoteServerHandler(name, openChannels, loader, server)
val stages = ssl ++ dec ++ join(lenDec, protobufDec) ++ enc ++ join(lenPrep, protobufEnc, remoteServer)
val stages: List[ChannelHandler] = dec ::: lenDec :: protobufDec :: enc ::: lenPrep :: protobufEnc :: remoteServer :: Nil
new StaticChannelPipeline(stages: _*)
}
}
@ -847,20 +800,7 @@ class RemoteServerHandler(
val clientAddress = getClientAddress(ctx)
sessionActors.set(event.getChannel(), new ConcurrentHashMap[String, ActorRef]())
typedSessionActors.set(event.getChannel(), new ConcurrentHashMap[String, AnyRef]())
if (SECURE) {
val sslHandler: SslHandler = ctx.getPipeline.get(classOf[SslHandler])
// Begin handshake.
sslHandler.handshake().addListener(new ChannelFutureListener {
def operationComplete(future: ChannelFuture): Unit = {
if (future.isSuccess) {
openChannels.add(future.getChannel)
server.notifyListeners(RemoteServerClientConnected(server, clientAddress))
} else future.getChannel.close
}
})
} else {
server.notifyListeners(RemoteServerClientConnected(server, clientAddress))
}
server.notifyListeners(RemoteServerClientConnected(server, clientAddress))
if (REQUIRE_COOKIE) ctx.setAttachment(CHANNEL_INIT) // signal that this is channel initialization, which will need authentication
}