Rewrite of remote protocol to use the new ActorRef protocol

This commit is contained in:
Jonas Bonér 2010-05-07 11:19:19 +02:00
parent f951d2c3de
commit cb2e39cc97
18 changed files with 435 additions and 582 deletions

View file

@ -11,7 +11,7 @@ import java.util.{Map => JMap}
import se.scalablesolutions.akka.actor._
import se.scalablesolutions.akka.util._
import se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.{RemoteReply, RemoteRequest}
import se.scalablesolutions.akka.remote.protobuf.RemoteProtocol._
import se.scalablesolutions.akka.config.Config.config
import org.jboss.netty.bootstrap.ServerBootstrap
@ -199,37 +199,54 @@ class RemoteServer extends Logging {
/**
* Register Remote Actor by the Actor's 'id' field.
*/
def register(actor: ActorRef) = synchronized {
def register(actorRef: ActorRef) = synchronized {
if (_isRunning) {
log.info("Registering server side remote actor [%s] with id [%s]", actor.actorClass.getName, actor.id)
RemoteServer.actorsFor(RemoteServer.Address(hostname, port)).actors.put(actor.id, actor)
log.info("Registering server side remote actor [%s] with id [%s]", actorRef.actorClass.getName, actorRef.id)
RemoteServer.actorsFor(RemoteServer.Address(hostname, port)).actors.put(actorRef.id, actorRef)
}
}
/**
* Register Remote Actor by a specific 'id' passed as argument.
* <p/>
* NOTE: If you use this method to register your remote actor then you must unregister the actor by this ID yourself.
*/
def register(id: String, actor: ActorRef) = synchronized {
def register(id: String, actorRef: ActorRef) = synchronized {
if (_isRunning) {
log.info("Registering server side remote actor [%s] with id [%s]", actor.actorClass.getName, id)
RemoteServer.actorsFor(RemoteServer.Address(hostname, port)).actors.put(id, actor)
log.info("Registering server side remote actor [%s] with id [%s]", actorRef.actorClass.getName, id)
RemoteServer.actorsFor(RemoteServer.Address(hostname, port)).actors.put(id, actorRef)
}
}
/**
* Unregister Remote Actor.
* Unregister Remote Actor that is registered using its 'id' field (not custom ID).
*/
def unregister(actor: ActorID) = synchronized {
def unregister(actorRef: ActorRef) = synchronized {
if (_isRunning) {
log.info("Unregistering server side remote actor [%s] with id [%s]", actor.actorClass.getName, actor.id)
log.info("Unregistering server side remote actor [%s] with id [%s]", actorRef.actorClass.getName, actorRef.id)
val server = RemoteServer.actorsFor(RemoteServer.Address(hostname, port))
server.actors.put(actor.id, actor)
if (actor.actor._registeredInRemoteNodeDuringSerialization) server.actors.remove(actor.uuid)
server.actors.remove(actorRef.id)
if (actorRef.actor._registeredInRemoteNodeDuringSerialization) server.actors.remove(actorRef.uuid)
}
}
/**
* Unregister Remote Actor by specific 'id'.
* <p/>
* NOTE: You need to call this method if you have registered an actor by a custom ID.
*/
def unregister(id: String) = synchronized {
if (_isRunning) {
log.info("Unregistering server side remote actor with id [%s]", id)
val server = RemoteServer.actorsFor(RemoteServer.Address(hostname, port))
val actorRef = server.actors.get(id)
server.actors.remove(id)
if (actorRef.actor._registeredInRemoteNodeDuringSerialization) server.actors.remove(actorRef.uuid)
}
}
}
case class Codec(encoder : ChannelHandler, decoder : ChannelHandler)
case class Codec(encoder: ChannelHandler, decoder: ChannelHandler)
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
@ -245,7 +262,7 @@ class RemoteServerPipelineFactory(
def getPipeline: ChannelPipeline = {
val lenDec = new LengthFieldBasedFrameDecoder(1048576, 0, 4, 0, 4)
val lenPrep = new LengthFieldPrepender(4)
val protobufDec = new ProtobufDecoder(RemoteRequest.getDefaultInstance)
val protobufDec = new ProtobufDecoder(RemoteRequestProtocol.getDefaultInstance)
val protobufEnc = new ProtobufEncoder
val zipCodec = RemoteServer.COMPRESSION_SCHEME match {
case "zlib" => Some(Codec(new ZlibEncoder(RemoteServer.ZLIB_COMPRESSION_LEVEL), new ZlibDecoder))
@ -295,50 +312,37 @@ class RemoteServerHandler(
override def messageReceived(ctx: ChannelHandlerContext, event: MessageEvent) = {
val message = event.getMessage
if (message eq null) throw new IllegalStateException("Message in remote MessageEvent is null: " + event)
if (message.isInstanceOf[RemoteRequest]) {
handleRemoteRequest(message.asInstanceOf[RemoteRequest], event.getChannel)
if (message.isInstanceOf[RemoteRequestProtocol]) {
handleRemoteRequestProtocol(message.asInstanceOf[RemoteRequestProtocol], event.getChannel)
}
}
override def exceptionCaught(ctx: ChannelHandlerContext, event: ExceptionEvent) = {
event.getCause.printStackTrace
log.error(event.getCause, "Unexpected exception from remote downstream")
event.getChannel.close
}
private def handleRemoteRequest(request: RemoteRequest, channel: Channel) = {
log.debug("Received RemoteRequest[\n%s]", request.toString)
private def handleRemoteRequestProtocol(request: RemoteRequestProtocol, channel: Channel) = {
log.debug("Received RemoteRequestProtocol[\n%s]", request.toString)
if (request.getIsActor) dispatchToActor(request, channel)
else dispatchToActiveObject(request, channel)
}
private def dispatchToActor(request: RemoteRequest, channel: Channel) = {
log.debug("Dispatching to remote actor [%s]", request.getTarget)
val actorId = createActor(request.getTarget, request.getUuid, request.getTimeout)
actorId.start
private def dispatchToActor(request: RemoteRequestProtocol, channel: Channel) = {
log.debug("Dispatching to remote actor [%s:%s]", request.getTarget, request.getUuid)
val actorRef = createActor(request.getTarget, request.getUuid, request.getTimeout)
actorRef.start
val message = RemoteProtocolBuilder.getMessage(request)
if (request.getIsOneWay) {
if (request.hasSourceHostname && request.hasSourcePort) {
// re-create the sending actor
val targetClass = if (request.hasSourceTarget) request.getSourceTarget
else request.getTarget
val remoteActorId = createActor(targetClass, request.getSourceUuid, request.getTimeout)
if (!remoteActorId.isRunning) {
remoteActorId.makeRemote(request.getSourceHostname, request.getSourcePort)
remoteActorId.start
}
actorId.!(message)(Some(remoteActorId))
} else {
// couldn't find a way to reply, send the message without a source/sender
actorId ! message
}
val sender = request.getSender
if (sender ne null) actorRef.!(message)(Some(ActorRef.fromProtocol(sender)))
} else {
try {
val resultOrNone = actorId !! message
val resultOrNone = actorRef !! message
val result: AnyRef = if (resultOrNone.isDefined) resultOrNone.get else null
log.debug("Returning result from actor invocation [%s]", result)
val replyBuilder = RemoteReply.newBuilder
val replyBuilder = RemoteReplyProtocol.newBuilder
.setId(request.getId)
.setIsSuccessful(true)
.setIsActor(true)
@ -349,7 +353,7 @@ class RemoteServerHandler(
} catch {
case e: Throwable =>
log.error(e, "Could not invoke remote actor [%s]", request.getTarget)
val replyBuilder = RemoteReply.newBuilder
val replyBuilder = RemoteReplyProtocol.newBuilder
.setId(request.getId)
.setException(e.getClass.getName + "$" + e.getMessage)
.setIsSuccessful(false)
@ -361,7 +365,7 @@ class RemoteServerHandler(
}
}
private def dispatchToActiveObject(request: RemoteRequest, channel: Channel) = {
private def dispatchToActiveObject(request: RemoteRequestProtocol, channel: Channel) = {
log.debug("Dispatching to remote active object [%s :: %s]", request.getMethod, request.getTarget)
val activeObject = createActiveObject(request.getTarget, request.getTimeout)
@ -377,7 +381,7 @@ class RemoteServerHandler(
else {
val result = messageReceiver.invoke(activeObject, unescapedArgs: _*)
log.debug("Returning result from remote active object invocation [%s]", result)
val replyBuilder = RemoteReply.newBuilder
val replyBuilder = RemoteReplyProtocol.newBuilder
.setId(request.getId)
.setIsSuccessful(true)
.setIsActor(false)
@ -389,7 +393,7 @@ class RemoteServerHandler(
} catch {
case e: InvocationTargetException =>
log.error(e.getCause, "Could not invoke remote active object [%s :: %s]", request.getMethod, request.getTarget)
val replyBuilder = RemoteReply.newBuilder
val replyBuilder = RemoteReplyProtocol.newBuilder
.setId(request.getId)
.setException(e.getCause.getClass.getName + "$" + e.getCause.getMessage)
.setIsSuccessful(false)
@ -399,7 +403,7 @@ class RemoteServerHandler(
channel.write(replyMessage)
case e: Throwable =>
log.error(e.getCause, "Could not invoke remote active object [%s :: %s]", request.getMethod, request.getTarget)
val replyBuilder = RemoteReply.newBuilder
val replyBuilder = RemoteReplyProtocol.newBuilder
.setId(request.getId)
.setException(e.getClass.getName + "$" + e.getMessage)
.setIsSuccessful(false)
@ -454,8 +458,9 @@ class RemoteServerHandler(
* Does not start the actor.
*/
private def createActor(name: String, uuid: String, timeout: Long): ActorRef = {
val actorIdOrNull = actors.get(uuid)
if (actorIdOrNull eq null) {
val actorRefOrNull = actors.get(uuid)
println("----------- ACTOR " + actorRefOrNull + " " + uuid)
if (actorRefOrNull eq null) {
try {
log.info("Creating a new remote actor [%s:%s]", name, uuid)
val clazz = if (applicationLoader.isDefined) applicationLoader.get.loadClass(name)
@ -464,14 +469,14 @@ class RemoteServerHandler(
newInstance._uuid = uuid
newInstance.timeout = timeout
newInstance._remoteAddress = None
val actorId = new ActorRef(() => newInstance)
actors.put(uuid, actorId)
actorId
val actorRef = new ActorRef(() => newInstance)
actors.put(uuid, actorRef)
actorRef
} catch {
case e =>
log.error(e, "Could not create remote actor instance")
throw e
}
} else actorIdOrNull
} else actorRefOrNull
}
}