completed protobuf protocol for remoting

This commit is contained in:
jboner 2009-07-18 00:16:32 +02:00
parent a4d22af64b
commit f26110e55c
23 changed files with 795 additions and 385 deletions

View file

@ -9,18 +9,18 @@ import java.net.InetSocketAddress
import java.util.concurrent.{ConcurrentHashMap, Executors}
import kernel.actor._
import kernel.stm.TransactionManagement
import kernel.util.{JSONSerializer, Logging}
import kernel.util.{Serializer, ScalaJSONSerializer, JavaJSONSerializer, Logging}
import protobuf.RemoteProtocol
import protobuf.RemoteProtocol.{RemoteReply, RemoteRequest}
import org.jboss.netty.bootstrap.ServerBootstrap
import org.jboss.netty.channel._
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory
import org.jboss.netty.handler.codec.frame.{LengthFieldBasedFrameDecoder, LengthFieldPrepender}
import org.jboss.netty.handler.codec.protobuf.{ProtobufDecoder, ProtobufEncoder}
import org.jboss.netty.handler.codec.serialization.ObjectDecoder
import org.jboss.netty.handler.codec.serialization.ObjectEncoder
import protobuf.RemoteProtocol.{RemoteReply, RemoteRequest}
import com.google.protobuf.ByteString
class RemoteServer extends Logging {
def start = RemoteServer.start
}
@ -41,8 +41,8 @@ object RemoteServer extends Logging {
private val bootstrap = new ServerBootstrap(factory)
// FIXME provide different codecs (Thrift, Avro, Protobuf, JSON)
private val handler = new AkkaServerHandler
bootstrap.getPipeline.addLast("handler", handler)
private val handler = new RemoteServerHandler
bootstrap.setPipelineFactory(new RemoteServerPipelineFactory)
bootstrap.setOption("child.tcpNoDelay", true)
bootstrap.setOption("child.keepAlive", true)
bootstrap.setOption("child.reuseAddress", true)
@ -57,12 +57,24 @@ object RemoteServer extends Logging {
}
}
class RemoteServerPipelineFactory extends ChannelPipelineFactory {
def getPipeline: ChannelPipeline = {
val p = Channels.pipeline()
p.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(1048576, 0, 4, 0, 4))
p.addLast("protobufDecoder", new ProtobufDecoder(RemoteProtocol.RemoteRequest.getDefaultInstance))
p.addLast("frameEncoder", new LengthFieldPrepender(4))
p.addLast("protobufEncoder", new ProtobufEncoder)
p.addLast("handler", new RemoteServerHandler)
p
}
}
@ChannelPipelineCoverage { val value = "all" }
class AkkaServerHandler extends SimpleChannelUpstreamHandler with Logging {
class RemoteServerHandler extends SimpleChannelUpstreamHandler with Logging {
private val activeObjectFactory = new ActiveObjectFactory
private val activeObjects = new ConcurrentHashMap[String, AnyRef]
private val actors = new ConcurrentHashMap[String, Actor]
override def handleUpstream(ctx: ChannelHandlerContext, event: ChannelEvent) = {
if (event.isInstanceOf[ChannelStateEvent] && event.asInstanceOf[ChannelStateEvent].getState != ChannelState.INTEREST_OPS) {
log.debug(event.toString)
@ -70,20 +82,6 @@ class AkkaServerHandler extends SimpleChannelUpstreamHandler with Logging {
super.handleUpstream(ctx, event)
}
override def channelOpen(ctx: ChannelHandlerContext, event: ChannelStateEvent) = {
//event.getChannel.getPipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(1048576, 0, 4, 0, 4));
//event.getChannel.getPipeline.addLast("protobufDecoder", new ProtobufDecoder(LocalTimeProtocol.LocalTimes.getDefaultInstance()));
//event.getChannel.getPipeline.addLast("frameEncoder", new LengthFieldPrepender(4));
//event.getChannel.getPipeline.addLast("protobufEncoder", new ProtobufEncoder());
event.getChannel.getPipeline.addFirst("encoder", new ObjectEncoder)
event.getChannel.getPipeline.addFirst("decoder", new ObjectDecoder)
}
override def channelConnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = {
//e.getChannel.write(firstMessage)
}
override def messageReceived(ctx: ChannelHandlerContext, event: MessageEvent) = {
val message = event.getMessage
if (message == null) throw new IllegalStateException("Message in remote MessageEvent is null: " + event)
@ -91,82 +89,100 @@ class AkkaServerHandler extends SimpleChannelUpstreamHandler with Logging {
}
override def exceptionCaught(ctx: ChannelHandlerContext, event: ExceptionEvent) = {
event.getCause.printStackTrace
log.error("Unexpected exception from remote downstream: %s", event.getCause)
event.getCause.printStackTrace
event.getChannel.close
}
private def handleRemoteRequest(request: RemoteRequest, channel: Channel) = {
log.debug(request.toString)
if (request.isActor) dispatchToActor(request, channel)
log.debug("Received RemoteRequest[\n%s]", request.toString)
if (request.getIsActor) dispatchToActor(request, channel)
else dispatchToActiveObject(request, channel)
}
private def dispatchToActor(request: RemoteRequest, channel: Channel) = {
log.debug("Dispatching to remote actor [%s]", request.target)
val actor = createActor(request.target, request.timeout)
log.debug("Dispatching to remote actor [%s]", request.getTarget)
val actor = createActor(request.getTarget, request.getTimeout)
actor.start
val messageBytes = request.getMessage
val messageType = request.getMessageType
val messageClass = Class.forName(messageType)
val message = JSONSerializer.in(messageBytes, messageClass)
if (request.isOneWay) actor ! message
val messageClass = Class.forName(request.getMessageType)
val message = ScalaJSONSerializer.in(request.getMessage.toByteArray, Some(messageClass))
if (request.getIsOneWay) actor ! message
else {
try {
val resultOrNone = actor !! message
val result: AnyRef = if (resultOrNone.isDefined) resultOrNone.get else null
log.debug("Returning result from actor invocation [%s]", result)
val replyMessage = JSONSerializer.out(result)
val reply = RemoteReply.newBuilder
.setId(request.getId)
.setMessage(replyMessage)
.setMessageType(result.getClass.getName)
.setIsSuccessful(true)
.setSupervisorUuid(request.getSupervisorUuid)
.build
channel.write(reply)
val replyMessage = ScalaJSONSerializer.out(result)
val replyBuilder = RemoteReply.newBuilder
.setId(request.getId)
.setMessage(ByteString.copyFrom(replyMessage))
.setMessageType(result.getClass.getName)
.setIsSuccessful(true)
.setIsActor(true)
if (request.hasSupervisorUuid) replyBuilder.setSupervisorUuid(request.getSupervisorUuid)
channel.write(replyBuilder.build)
} catch {
case e: Throwable =>
log.error("Could not invoke remote actor [%s] due to: %s", request.target, e)
log.error("Could not invoke remote actor [%s] due to: %s", request.getTarget, e)
e.printStackTrace
val reply = RemoteReply.newBuilder
.setId(request.getId)
.setException(e.toString)
.setIsSuccessful(false)
.setSupervisorUuid(request.getSupervisorUuid)
.build
channel.write(reply)
val replyBuilder = RemoteReply.newBuilder
.setId(request.getId)
.setException(e.getClass.getName + "$" + e.getMessage)
.setIsSuccessful(false)
.setIsActor(true)
if (request.hasSupervisorUuid) replyBuilder.setSupervisorUuid(request.getSupervisorUuid)
channel.write(replyBuilder.build)
}
}
}
private def dispatchToActiveObject(request: RemoteRequest, channel: Channel) = {
log.debug("Dispatching to remote active object [%s :: %s]", request.method, request.target)
val activeObject = createActiveObject(request.target, request.timeout)
log.debug("Dispatching to remote active object [%s :: %s]", request.getMethod, request.getTarget)
val activeObject = createActiveObject(request.getTarget, request.getTimeout)
val args = request.message.asInstanceOf[scala.List[AnyRef]]
val args: scala.List[AnyRef] = JavaJSONSerializer.in(request.getMessage.toByteArray, Some(classOf[scala.List[AnyRef]]))
val argClasses = args.map(_.getClass)
val (unescapedArgs, unescapedArgClasses) = unescapeArgs(args, argClasses, request.timeout)
val (unescapedArgs, unescapedArgClasses) = unescapeArgs(args, argClasses, request.getTimeout)
//continueTransaction(request)
try {
val messageReceiver = activeObject.getClass.getDeclaredMethod(request.method, unescapedArgClasses: _*)
if (request.isOneWay) messageReceiver.invoke(activeObject, unescapedArgs: _*)
val messageReceiver = activeObject.getClass.getDeclaredMethod(request.getMethod, unescapedArgClasses: _*)
if (request.getIsOneWay) messageReceiver.invoke(activeObject, unescapedArgs: _*)
else {
val result = messageReceiver.invoke(activeObject, unescapedArgs: _*)
log.debug("Returning result from remote active object invocation [%s]", result)
//channel.write(request.newReplyWithMessage(result, TransactionManagement.threadBoundTx.get))
channel.write(request.newReplyWithMessage(result, null))
val replyMessage = JavaJSONSerializer.out(result)
val replyBuilder = RemoteReply.newBuilder
.setId(request.getId)
.setMessage(ByteString.copyFrom(replyMessage))
.setMessageType(result.getClass.getName)
.setIsSuccessful(true)
.setIsActor(false)
if (request.hasSupervisorUuid) replyBuilder.setSupervisorUuid(request.getSupervisorUuid)
channel.write(replyBuilder.build)
}
} catch {
case e: InvocationTargetException =>
log.error("Could not invoke remote active object [%s :: %s] due to: %s", request.method, request.target, e.getCause)
log.error("Could not invoke remote active object [%s :: %s] due to: %s", request.getMethod, request.getTarget, e.getCause)
e.getCause.printStackTrace
channel.write(request.newReplyWithException(e.getCause))
val replyBuilder = RemoteReply.newBuilder
.setId(request.getId)
.setException(e.getCause.getClass.getName + "$" + e.getCause.getMessage)
.setException(e.getCause.toString)
.setIsSuccessful(false)
.setIsActor(false)
if (request.hasSupervisorUuid) replyBuilder.setSupervisorUuid(request.getSupervisorUuid)
channel.write(replyBuilder.build)
case e: Throwable =>
log.error("Could not invoke remote active object [%s :: %s] due to: %s", request.method, request.target, e)
log.error("Could not invoke remote active object [%s :: %s] due to: %s", request.getMethod, request.getTarget, e)
e.printStackTrace
channel.write(request.newReplyWithException(e))
val replyBuilder = RemoteReply.newBuilder
.setId(request.getId)
.setException(e.getClass.getName + "$" + e.getMessage)
.setIsSuccessful(false)
.setIsActor(false)
if (request.hasSupervisorUuid) replyBuilder.setSupervisorUuid(request.getSupervisorUuid)
channel.write(replyBuilder.build)
}
}
@ -184,8 +200,10 @@ class AkkaServerHandler extends SimpleChannelUpstreamHandler with Logging {
val unescapedArgClasses = new Array[Class[_]](args.size)
val escapedArgs = for (i <- 0 until args.size) {
if (args(i).isInstanceOf[ProxyWrapper]) {
val proxyName = args(i).asInstanceOf[ProxyWrapper].proxyName
val arg = args(i)
if (arg.isInstanceOf[String] && arg.asInstanceOf[String] == "$$ProxiedByAW") {
val argString = arg.asInstanceOf[String]
val proxyName = argString.substring(argString.indexOf("$$ProxiedByAW"), argString.length)
val activeObject = createActiveObject(proxyName, timeout)
unescapedArgs(i) = activeObject
unescapedArgClasses(i) = Class.forName(proxyName)