clean up and stabilization, getting ready for M1

This commit is contained in:
jboner 2009-07-12 23:08:17 +02:00
parent 6a65c67ca7
commit 95d598fb66
119 changed files with 1839 additions and 5140 deletions

View file

@ -15,6 +15,10 @@ import kernel.util.Logging
import org.jboss.netty.bootstrap.ServerBootstrap
import org.jboss.netty.channel._
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory
import org.jboss.netty.handler.codec.frame.{LengthFieldBasedFrameDecoder, LengthFieldPrepender}
import org.jboss.netty.handler.codec.protobuf.{ProtobufDecoder, ProtobufEncoder}
import org.jboss.netty.handler.codec.serialization.ObjectDecoder
import org.jboss.netty.handler.codec.serialization.ObjectEncoder
@ -38,7 +42,7 @@ object RemoteServer extends Logging {
private val bootstrap = new ServerBootstrap(factory)
// FIXME provide different codecs (Thrift, Avro, Protobuf, JSON)
private val handler = new ObjectServerHandler
private val handler = new AkkaServerHandler
bootstrap.getPipeline.addLast("handler", handler)
bootstrap.setOption("child.tcpNoDelay", true)
bootstrap.setOption("child.keepAlive", true)
@ -55,7 +59,7 @@ object RemoteServer extends Logging {
}
@ChannelPipelineCoverage { val value = "all" }
class ObjectServerHandler extends SimpleChannelUpstreamHandler with Logging {
class AkkaServerHandler extends SimpleChannelUpstreamHandler with Logging {
private val activeObjectFactory = new ActiveObjectFactory
private val activeObjects = new ConcurrentHashMap[String, AnyRef]
private val actors = new ConcurrentHashMap[String, Actor]
@ -68,6 +72,11 @@ class ObjectServerHandler extends SimpleChannelUpstreamHandler with Logging {
}
override def channelOpen(ctx: ChannelHandlerContext, event: ChannelStateEvent) = {
//event.getChannel.getPipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(1048576, 0, 4, 0, 4));
//event.getChannel.getPipeline.addLast("protobufDecoder", new ProtobufDecoder(LocalTimeProtocol.LocalTimes.getDefaultInstance()));
//event.getChannel.getPipeline.addLast("frameEncoder", new LengthFieldPrepender(4));
//event.getChannel.getPipeline.addLast("protobufEncoder", new ProtobufEncoder());
event.getChannel.getPipeline.addFirst("encoder", new ObjectEncoder)
event.getChannel.getPipeline.addFirst("decoder", new ObjectDecoder)
}
@ -145,6 +154,7 @@ class ObjectServerHandler extends SimpleChannelUpstreamHandler with Logging {
}
}
/*
private def continueTransaction(request: RemoteRequest) = {
val tx = request.tx
if (tx.isDefined) {
@ -152,7 +162,7 @@ class ObjectServerHandler extends SimpleChannelUpstreamHandler with Logging {
TransactionManagement.threadBoundTx.set(tx)
} else TransactionManagement.threadBoundTx.set(None)
}
*/
private def unescapeArgs(args: scala.List[AnyRef], argClasses: scala.List[Class[_]], timeout: Long) = {
val unescapedArgs = new Array[AnyRef](args.size)
val unescapedArgClasses = new Array[Class[_]](args.size)