Rewrite of remote protocol to use the new ActorRef protocol
This commit is contained in:
parent
17fc19b4e7
commit
fb3ae7ed2b
18 changed files with 435 additions and 582 deletions
|
|
@ -11,7 +11,7 @@ import java.util.{Map => JMap}
|
|||
|
||||
import se.scalablesolutions.akka.actor._
|
||||
import se.scalablesolutions.akka.util._
|
||||
import se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.{RemoteReply, RemoteRequest}
|
||||
import se.scalablesolutions.akka.remote.protobuf.RemoteProtocol._
|
||||
import se.scalablesolutions.akka.config.Config.config
|
||||
|
||||
import org.jboss.netty.bootstrap.ServerBootstrap
|
||||
|
|
@ -199,37 +199,54 @@ class RemoteServer extends Logging {
|
|||
/**
|
||||
* Register Remote Actor by the Actor's 'id' field.
|
||||
*/
|
||||
def register(actor: ActorRef) = synchronized {
|
||||
def register(actorRef: ActorRef) = synchronized {
|
||||
if (_isRunning) {
|
||||
log.info("Registering server side remote actor [%s] with id [%s]", actor.actorClass.getName, actor.id)
|
||||
RemoteServer.actorsFor(RemoteServer.Address(hostname, port)).actors.put(actor.id, actor)
|
||||
log.info("Registering server side remote actor [%s] with id [%s]", actorRef.actorClass.getName, actorRef.id)
|
||||
RemoteServer.actorsFor(RemoteServer.Address(hostname, port)).actors.put(actorRef.id, actorRef)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Register Remote Actor by a specific 'id' passed as argument.
|
||||
* <p/>
|
||||
* NOTE: If you use this method to register your remote actor then you must unregister the actor by this ID yourself.
|
||||
*/
|
||||
def register(id: String, actor: ActorRef) = synchronized {
|
||||
def register(id: String, actorRef: ActorRef) = synchronized {
|
||||
if (_isRunning) {
|
||||
log.info("Registering server side remote actor [%s] with id [%s]", actor.actorClass.getName, id)
|
||||
RemoteServer.actorsFor(RemoteServer.Address(hostname, port)).actors.put(id, actor)
|
||||
log.info("Registering server side remote actor [%s] with id [%s]", actorRef.actorClass.getName, id)
|
||||
RemoteServer.actorsFor(RemoteServer.Address(hostname, port)).actors.put(id, actorRef)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Unregister Remote Actor.
|
||||
* Unregister Remote Actor that is registered using its 'id' field (not custom ID).
|
||||
*/
|
||||
def unregister(actor: ActorID) = synchronized {
|
||||
def unregister(actorRef: ActorRef) = synchronized {
|
||||
if (_isRunning) {
|
||||
log.info("Unregistering server side remote actor [%s] with id [%s]", actor.actorClass.getName, actor.id)
|
||||
log.info("Unregistering server side remote actor [%s] with id [%s]", actorRef.actorClass.getName, actorRef.id)
|
||||
val server = RemoteServer.actorsFor(RemoteServer.Address(hostname, port))
|
||||
server.actors.put(actor.id, actor)
|
||||
if (actor.actor._registeredInRemoteNodeDuringSerialization) server.actors.remove(actor.uuid)
|
||||
server.actors.remove(actorRef.id)
|
||||
if (actorRef.actor._registeredInRemoteNodeDuringSerialization) server.actors.remove(actorRef.uuid)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Unregister Remote Actor by specific 'id'.
|
||||
* <p/>
|
||||
* NOTE: You need to call this method if you have registered an actor by a custom ID.
|
||||
*/
|
||||
def unregister(id: String) = synchronized {
|
||||
if (_isRunning) {
|
||||
log.info("Unregistering server side remote actor with id [%s]", id)
|
||||
val server = RemoteServer.actorsFor(RemoteServer.Address(hostname, port))
|
||||
val actorRef = server.actors.get(id)
|
||||
server.actors.remove(id)
|
||||
if (actorRef.actor._registeredInRemoteNodeDuringSerialization) server.actors.remove(actorRef.uuid)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
case class Codec(encoder : ChannelHandler, decoder : ChannelHandler)
|
||||
case class Codec(encoder: ChannelHandler, decoder: ChannelHandler)
|
||||
|
||||
/**
|
||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
|
|
@ -245,7 +262,7 @@ class RemoteServerPipelineFactory(
|
|||
def getPipeline: ChannelPipeline = {
|
||||
val lenDec = new LengthFieldBasedFrameDecoder(1048576, 0, 4, 0, 4)
|
||||
val lenPrep = new LengthFieldPrepender(4)
|
||||
val protobufDec = new ProtobufDecoder(RemoteRequest.getDefaultInstance)
|
||||
val protobufDec = new ProtobufDecoder(RemoteRequestProtocol.getDefaultInstance)
|
||||
val protobufEnc = new ProtobufEncoder
|
||||
val zipCodec = RemoteServer.COMPRESSION_SCHEME match {
|
||||
case "zlib" => Some(Codec(new ZlibEncoder(RemoteServer.ZLIB_COMPRESSION_LEVEL), new ZlibDecoder))
|
||||
|
|
@ -295,50 +312,37 @@ class RemoteServerHandler(
|
|||
override def messageReceived(ctx: ChannelHandlerContext, event: MessageEvent) = {
|
||||
val message = event.getMessage
|
||||
if (message eq null) throw new IllegalStateException("Message in remote MessageEvent is null: " + event)
|
||||
if (message.isInstanceOf[RemoteRequest]) {
|
||||
handleRemoteRequest(message.asInstanceOf[RemoteRequest], event.getChannel)
|
||||
if (message.isInstanceOf[RemoteRequestProtocol]) {
|
||||
handleRemoteRequestProtocol(message.asInstanceOf[RemoteRequestProtocol], event.getChannel)
|
||||
}
|
||||
}
|
||||
|
||||
override def exceptionCaught(ctx: ChannelHandlerContext, event: ExceptionEvent) = {
|
||||
event.getCause.printStackTrace
|
||||
log.error(event.getCause, "Unexpected exception from remote downstream")
|
||||
event.getChannel.close
|
||||
}
|
||||
|
||||
private def handleRemoteRequest(request: RemoteRequest, channel: Channel) = {
|
||||
log.debug("Received RemoteRequest[\n%s]", request.toString)
|
||||
private def handleRemoteRequestProtocol(request: RemoteRequestProtocol, channel: Channel) = {
|
||||
log.debug("Received RemoteRequestProtocol[\n%s]", request.toString)
|
||||
if (request.getIsActor) dispatchToActor(request, channel)
|
||||
else dispatchToActiveObject(request, channel)
|
||||
}
|
||||
|
||||
private def dispatchToActor(request: RemoteRequest, channel: Channel) = {
|
||||
log.debug("Dispatching to remote actor [%s]", request.getTarget)
|
||||
val actorId = createActor(request.getTarget, request.getUuid, request.getTimeout)
|
||||
actorId.start
|
||||
|
||||
private def dispatchToActor(request: RemoteRequestProtocol, channel: Channel) = {
|
||||
log.debug("Dispatching to remote actor [%s:%s]", request.getTarget, request.getUuid)
|
||||
val actorRef = createActor(request.getTarget, request.getUuid, request.getTimeout)
|
||||
actorRef.start
|
||||
val message = RemoteProtocolBuilder.getMessage(request)
|
||||
if (request.getIsOneWay) {
|
||||
if (request.hasSourceHostname && request.hasSourcePort) {
|
||||
// re-create the sending actor
|
||||
val targetClass = if (request.hasSourceTarget) request.getSourceTarget
|
||||
else request.getTarget
|
||||
|
||||
val remoteActorId = createActor(targetClass, request.getSourceUuid, request.getTimeout)
|
||||
if (!remoteActorId.isRunning) {
|
||||
remoteActorId.makeRemote(request.getSourceHostname, request.getSourcePort)
|
||||
remoteActorId.start
|
||||
}
|
||||
actorId.!(message)(Some(remoteActorId))
|
||||
} else {
|
||||
// couldn't find a way to reply, send the message without a source/sender
|
||||
actorId ! message
|
||||
}
|
||||
val sender = request.getSender
|
||||
if (sender ne null) actorRef.!(message)(Some(ActorRef.fromProtocol(sender)))
|
||||
} else {
|
||||
try {
|
||||
val resultOrNone = actorId !! message
|
||||
val resultOrNone = actorRef !! message
|
||||
val result: AnyRef = if (resultOrNone.isDefined) resultOrNone.get else null
|
||||
log.debug("Returning result from actor invocation [%s]", result)
|
||||
val replyBuilder = RemoteReply.newBuilder
|
||||
val replyBuilder = RemoteReplyProtocol.newBuilder
|
||||
.setId(request.getId)
|
||||
.setIsSuccessful(true)
|
||||
.setIsActor(true)
|
||||
|
|
@ -349,7 +353,7 @@ class RemoteServerHandler(
|
|||
} catch {
|
||||
case e: Throwable =>
|
||||
log.error(e, "Could not invoke remote actor [%s]", request.getTarget)
|
||||
val replyBuilder = RemoteReply.newBuilder
|
||||
val replyBuilder = RemoteReplyProtocol.newBuilder
|
||||
.setId(request.getId)
|
||||
.setException(e.getClass.getName + "$" + e.getMessage)
|
||||
.setIsSuccessful(false)
|
||||
|
|
@ -361,7 +365,7 @@ class RemoteServerHandler(
|
|||
}
|
||||
}
|
||||
|
||||
private def dispatchToActiveObject(request: RemoteRequest, channel: Channel) = {
|
||||
private def dispatchToActiveObject(request: RemoteRequestProtocol, channel: Channel) = {
|
||||
log.debug("Dispatching to remote active object [%s :: %s]", request.getMethod, request.getTarget)
|
||||
val activeObject = createActiveObject(request.getTarget, request.getTimeout)
|
||||
|
||||
|
|
@ -377,7 +381,7 @@ class RemoteServerHandler(
|
|||
else {
|
||||
val result = messageReceiver.invoke(activeObject, unescapedArgs: _*)
|
||||
log.debug("Returning result from remote active object invocation [%s]", result)
|
||||
val replyBuilder = RemoteReply.newBuilder
|
||||
val replyBuilder = RemoteReplyProtocol.newBuilder
|
||||
.setId(request.getId)
|
||||
.setIsSuccessful(true)
|
||||
.setIsActor(false)
|
||||
|
|
@ -389,7 +393,7 @@ class RemoteServerHandler(
|
|||
} catch {
|
||||
case e: InvocationTargetException =>
|
||||
log.error(e.getCause, "Could not invoke remote active object [%s :: %s]", request.getMethod, request.getTarget)
|
||||
val replyBuilder = RemoteReply.newBuilder
|
||||
val replyBuilder = RemoteReplyProtocol.newBuilder
|
||||
.setId(request.getId)
|
||||
.setException(e.getCause.getClass.getName + "$" + e.getCause.getMessage)
|
||||
.setIsSuccessful(false)
|
||||
|
|
@ -399,7 +403,7 @@ class RemoteServerHandler(
|
|||
channel.write(replyMessage)
|
||||
case e: Throwable =>
|
||||
log.error(e.getCause, "Could not invoke remote active object [%s :: %s]", request.getMethod, request.getTarget)
|
||||
val replyBuilder = RemoteReply.newBuilder
|
||||
val replyBuilder = RemoteReplyProtocol.newBuilder
|
||||
.setId(request.getId)
|
||||
.setException(e.getClass.getName + "$" + e.getMessage)
|
||||
.setIsSuccessful(false)
|
||||
|
|
@ -454,8 +458,9 @@ class RemoteServerHandler(
|
|||
* Does not start the actor.
|
||||
*/
|
||||
private def createActor(name: String, uuid: String, timeout: Long): ActorRef = {
|
||||
val actorIdOrNull = actors.get(uuid)
|
||||
if (actorIdOrNull eq null) {
|
||||
val actorRefOrNull = actors.get(uuid)
|
||||
println("----------- ACTOR " + actorRefOrNull + " " + uuid)
|
||||
if (actorRefOrNull eq null) {
|
||||
try {
|
||||
log.info("Creating a new remote actor [%s:%s]", name, uuid)
|
||||
val clazz = if (applicationLoader.isDefined) applicationLoader.get.loadClass(name)
|
||||
|
|
@ -464,14 +469,14 @@ class RemoteServerHandler(
|
|||
newInstance._uuid = uuid
|
||||
newInstance.timeout = timeout
|
||||
newInstance._remoteAddress = None
|
||||
val actorId = new ActorRef(() => newInstance)
|
||||
actors.put(uuid, actorId)
|
||||
actorId
|
||||
val actorRef = new ActorRef(() => newInstance)
|
||||
actors.put(uuid, actorRef)
|
||||
actorRef
|
||||
} catch {
|
||||
case e =>
|
||||
log.error(e, "Could not create remote actor instance")
|
||||
throw e
|
||||
}
|
||||
} else actorIdOrNull
|
||||
} else actorRefOrNull
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue