Added TypedActor and TypedTransactor base classes.
Renamed ActiveObject factory object to TypedActor. Improved network protocol for TypedActor. Remote TypedActors now identified by UUID.
This commit is contained in:
parent
3f0fba4654
commit
add77029e0
84 changed files with 2278 additions and 1445 deletions
|
|
@ -55,7 +55,7 @@ object RemoteNode extends RemoteServer
|
|||
|
||||
/**
|
||||
* For internal use only.
|
||||
* Holds configuration variables, remote actors, remote active objects and remote servers.
|
||||
* Holds configuration variables, remote actors, remote typed actors and remote servers.
|
||||
*
|
||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
*/
|
||||
|
|
@ -104,7 +104,7 @@ object RemoteServer {
|
|||
actorsFor(RemoteServer.Address(address.getHostName, address.getPort)).actors.put(uuid, actor)
|
||||
}
|
||||
|
||||
private[akka] def registerActiveObject(address: InetSocketAddress, name: String, activeObject: AnyRef) = guard.withWriteGuard {
|
||||
private[akka] def registerTypedActor(address: InetSocketAddress, name: String, activeObject: AnyRef) = guard.withWriteGuard {
|
||||
actorsFor(RemoteServer.Address(address.getHostName, address.getPort)).activeObjects.put(name, activeObject)
|
||||
}
|
||||
|
||||
|
|
@ -228,7 +228,7 @@ class RemoteServer extends Logging {
|
|||
}
|
||||
}
|
||||
|
||||
// TODO: register active object in RemoteServer as well
|
||||
// TODO: register typed actor in RemoteServer as well
|
||||
|
||||
/**
|
||||
* Register Remote Actor by the Actor's 'id' field. It starts the Actor if it is not started already.
|
||||
|
|
@ -360,139 +360,88 @@ class RemoteServerHandler(
|
|||
|
||||
private def handleRemoteRequestProtocol(request: RemoteRequestProtocol, channel: Channel) = {
|
||||
log.debug("Received RemoteRequestProtocol[\n%s]", request.toString)
|
||||
if (request.getIsActor) dispatchToActor(request, channel)
|
||||
else dispatchToActiveObject(request, channel)
|
||||
val actorType = request.getActorInfo.getActorType
|
||||
if (actorType == ActorType.SCALA_ACTOR) dispatchToActor(request, channel)
|
||||
else if (actorType == ActorType.JAVA_ACTOR) throw new IllegalActorStateException("ActorType JAVA_ACTOR is currently not supported")
|
||||
else if (actorType == ActorType.TYPED_ACTOR) dispatchToTypedActor(request, channel)
|
||||
else throw new IllegalActorStateException("Unknown ActorType [" + actorType + "]")
|
||||
}
|
||||
|
||||
private def dispatchToActor(request: RemoteRequestProtocol, channel: Channel) = {
|
||||
log.debug("Dispatching to remote actor [%s:%s]", request.getTarget, request.getUuid)
|
||||
val actorRef = createActor(request.getTarget, request.getUuid, request.getTimeout)
|
||||
val actorInfo = request.getActorInfo
|
||||
log.debug("Dispatching to remote actor [%s:%s]", actorInfo.getTarget, actorInfo.getUuid)
|
||||
|
||||
val actorRef = createActor(actorInfo)
|
||||
actorRef.start
|
||||
|
||||
val message = MessageSerializer.deserialize(request.getMessage)
|
||||
val sender =
|
||||
if (request.hasSender) Some(RemoteActorSerialization.fromProtobufToRemoteActorRef(request.getSender, applicationLoader))
|
||||
else None
|
||||
|
||||
if (request.getIsOneWay) actorRef.!(message)(sender)
|
||||
else {
|
||||
try {
|
||||
val resultOrNone = (actorRef.!!(message)(sender)).as[AnyRef]
|
||||
val result = if (resultOrNone.isDefined) resultOrNone.get else null
|
||||
|
||||
log.debug("Returning result from actor invocation [%s]", result)
|
||||
val replyBuilder = RemoteReplyProtocol.newBuilder
|
||||
.setId(request.getId)
|
||||
.setMessage(MessageSerializer.serialize(result))
|
||||
.setIsSuccessful(true)
|
||||
.setIsActor(true)
|
||||
|
||||
if (request.hasSupervisorUuid) replyBuilder.setSupervisorUuid(request.getSupervisorUuid)
|
||||
val replyMessage = replyBuilder.build
|
||||
channel.write(replyMessage)
|
||||
channel.write(replyBuilder.build)
|
||||
|
||||
} catch {
|
||||
case e: Throwable =>
|
||||
log.error(e, "Could not invoke remote actor [%s]", request.getTarget)
|
||||
val replyBuilder = RemoteReplyProtocol.newBuilder
|
||||
.setId(request.getId)
|
||||
.setException(ExceptionProtocol.newBuilder.setClassname(e.getClass.getName).setMessage(e.getMessage).build)
|
||||
.setIsSuccessful(false)
|
||||
.setIsActor(true)
|
||||
if (request.hasSupervisorUuid) replyBuilder.setSupervisorUuid(request.getSupervisorUuid)
|
||||
val replyMessage = replyBuilder.build
|
||||
channel.write(replyMessage)
|
||||
case e: Throwable => channel.write(createErrorReplyMessage(e, request, true))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private def dispatchToActiveObject(request: RemoteRequestProtocol, channel: Channel) = {
|
||||
log.debug("Dispatching to remote active object [%s :: %s]", request.getMethod, request.getTarget)
|
||||
val activeObject = createActiveObject(request.getTarget, request.getTimeout)
|
||||
private def dispatchToTypedActor(request: RemoteRequestProtocol, channel: Channel) = {
|
||||
val actorInfo = request.getActorInfo
|
||||
val typedActorInfo = actorInfo.getTypedActorInfo
|
||||
log.debug("Dispatching to remote typed actor [%s :: %s]", typedActorInfo.getMethod, typedActorInfo.getInterface)
|
||||
val activeObject = createTypedActor(actorInfo)
|
||||
|
||||
val args = MessageSerializer.deserialize(request.getMessage).asInstanceOf[Array[AnyRef]].toList
|
||||
val argClasses = args.map(_.getClass)
|
||||
val (unescapedArgs, unescapedArgClasses) = unescapeArgs(args, argClasses, request.getTimeout)
|
||||
|
||||
try {
|
||||
val messageReceiver = activeObject.getClass.getDeclaredMethod(
|
||||
request.getMethod, unescapedArgClasses: _*)
|
||||
if (request.getIsOneWay) messageReceiver.invoke(activeObject, unescapedArgs: _*)
|
||||
val messageReceiver = activeObject.getClass.getDeclaredMethod(typedActorInfo.getMethod, argClasses: _*)
|
||||
if (request.getIsOneWay) messageReceiver.invoke(activeObject, args: _*)
|
||||
else {
|
||||
val result = messageReceiver.invoke(activeObject, unescapedArgs: _*)
|
||||
log.debug("Returning result from remote active object invocation [%s]", result)
|
||||
val result = messageReceiver.invoke(activeObject, args: _*)
|
||||
log.debug("Returning result from remote typed actor invocation [%s]", result)
|
||||
val replyBuilder = RemoteReplyProtocol.newBuilder
|
||||
.setId(request.getId)
|
||||
.setMessage(MessageSerializer.serialize(result))
|
||||
.setIsSuccessful(true)
|
||||
.setIsActor(false)
|
||||
if (request.hasSupervisorUuid) replyBuilder.setSupervisorUuid(request.getSupervisorUuid)
|
||||
val replyMessage = replyBuilder.build
|
||||
channel.write(replyMessage)
|
||||
channel.write(replyBuilder.build)
|
||||
}
|
||||
} catch {
|
||||
case e: InvocationTargetException =>
|
||||
log.error(e.getCause, "Could not invoke remote active object [%s :: %s]", request.getMethod, request.getTarget)
|
||||
val replyBuilder = RemoteReplyProtocol.newBuilder
|
||||
.setId(request.getId)
|
||||
.setException(ExceptionProtocol.newBuilder.setClassname(e.getCause.getClass.getName).setMessage(e.getCause.getMessage).build)
|
||||
.setIsSuccessful(false)
|
||||
.setIsActor(false)
|
||||
if (request.hasSupervisorUuid) replyBuilder.setSupervisorUuid(request.getSupervisorUuid)
|
||||
val replyMessage = replyBuilder.build
|
||||
channel.write(replyMessage)
|
||||
case e: Throwable =>
|
||||
log.error(e, "Could not invoke remote active object [%s :: %s]", request.getMethod, request.getTarget)
|
||||
val replyBuilder = RemoteReplyProtocol.newBuilder
|
||||
.setId(request.getId)
|
||||
.setException(ExceptionProtocol.newBuilder.setClassname(e.getClass.getName).setMessage(e.getMessage).build)
|
||||
.setIsSuccessful(false)
|
||||
.setIsActor(false)
|
||||
if (request.hasSupervisorUuid) replyBuilder.setSupervisorUuid(request.getSupervisorUuid)
|
||||
val replyMessage = replyBuilder.build
|
||||
channel.write(replyMessage)
|
||||
case e: InvocationTargetException => channel.write(createErrorReplyMessage(e.getCause, request, false))
|
||||
case e: Throwable => channel.write(createErrorReplyMessage(e, request, false))
|
||||
}
|
||||
}
|
||||
|
||||
private def unescapeArgs(args: scala.List[AnyRef], argClasses: scala.List[Class[_]], timeout: Long) = {
|
||||
val unescapedArgs = new Array[AnyRef](args.size)
|
||||
val unescapedArgClasses = new Array[Class[_]](args.size)
|
||||
|
||||
val escapedArgs = for (i <- 0 until args.size) {
|
||||
val arg = args(i)
|
||||
if (arg.isInstanceOf[String] && arg.asInstanceOf[String].startsWith(AW_PROXY_PREFIX)) {
|
||||
val argString = arg.asInstanceOf[String]
|
||||
val proxyName = argString.replace(AW_PROXY_PREFIX, "")
|
||||
val activeObject = createActiveObject(proxyName, timeout)
|
||||
unescapedArgs(i) = activeObject
|
||||
unescapedArgClasses(i) = Class.forName(proxyName)
|
||||
} else {
|
||||
unescapedArgs(i) = args(i)
|
||||
unescapedArgClasses(i) = argClasses(i)
|
||||
}
|
||||
}
|
||||
(unescapedArgs, unescapedArgClasses)
|
||||
}
|
||||
|
||||
private def createActiveObject(name: String, timeout: Long): AnyRef = {
|
||||
val activeObjectOrNull = activeObjects.get(name)
|
||||
if (activeObjectOrNull eq null) {
|
||||
try {
|
||||
log.info("Creating a new remote active object [%s]", name)
|
||||
val clazz = if (applicationLoader.isDefined) applicationLoader.get.loadClass(name)
|
||||
else Class.forName(name)
|
||||
val newInstance = ActiveObject.newInstance(clazz, timeout).asInstanceOf[AnyRef]
|
||||
activeObjects.put(name, newInstance)
|
||||
newInstance
|
||||
} catch {
|
||||
case e =>
|
||||
log.error(e, "Could not create remote active object instance")
|
||||
throw e
|
||||
}
|
||||
} else activeObjectOrNull
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a new instance of the actor with name, uuid and timeout specified as arguments.
|
||||
*
|
||||
* If actor already created then just return it from the registry.
|
||||
*
|
||||
* Does not start the actor.
|
||||
*/
|
||||
private def createActor(name: String, uuid: String, timeout: Long): ActorRef = {
|
||||
private def createActor(actorInfo: ActorInfoProtocol): ActorRef = {
|
||||
val name = actorInfo.getTarget
|
||||
val uuid = actorInfo.getUuid
|
||||
val timeout = actorInfo.getTimeout
|
||||
|
||||
val actorRefOrNull = actors.get(uuid)
|
||||
if (actorRefOrNull eq null) {
|
||||
try {
|
||||
|
|
@ -512,4 +461,43 @@ class RemoteServerHandler(
|
|||
}
|
||||
} else actorRefOrNull
|
||||
}
|
||||
|
||||
private def createTypedActor(actorInfo: ActorInfoProtocol): AnyRef = {
|
||||
val uuid = actorInfo.getUuid
|
||||
val activeObjectOrNull = activeObjects.get(uuid)
|
||||
|
||||
if (activeObjectOrNull eq null) {
|
||||
val typedActorInfo = actorInfo.getTypedActorInfo
|
||||
val interfaceClassname = typedActorInfo.getInterface
|
||||
val targetClassname = actorInfo.getTarget
|
||||
|
||||
try {
|
||||
log.info("Creating a new remote typed actor:\n\t[%s :: %s]", interfaceClassname, targetClassname)
|
||||
|
||||
val (interfaceClass, targetClass) =
|
||||
if (applicationLoader.isDefined) (applicationLoader.get.loadClass(interfaceClassname),
|
||||
applicationLoader.get.loadClass(targetClassname))
|
||||
else (Class.forName(interfaceClassname), Class.forName(targetClassname))
|
||||
|
||||
val newInstance = TypedActor.newInstance(
|
||||
interfaceClass, targetClass.asInstanceOf[Class[_ <: TypedActor]], actorInfo.getTimeout).asInstanceOf[AnyRef]
|
||||
activeObjects.put(uuid, newInstance)
|
||||
newInstance
|
||||
} catch {
|
||||
case e => log.error(e, "Could not create remote typed actor instance"); throw e
|
||||
}
|
||||
} else activeObjectOrNull
|
||||
}
|
||||
|
||||
private def createErrorReplyMessage(e: Throwable, request: RemoteRequestProtocol, isActor: Boolean): RemoteReplyProtocol = {
|
||||
val actorInfo = request.getActorInfo
|
||||
log.error(e, "Could not invoke remote typed actor [%s :: %s]", actorInfo.getTypedActorInfo.getMethod, actorInfo.getTarget)
|
||||
val replyBuilder = RemoteReplyProtocol.newBuilder
|
||||
.setId(request.getId)
|
||||
.setException(ExceptionProtocol.newBuilder.setClassname(e.getClass.getName).setMessage(e.getMessage).build)
|
||||
.setIsSuccessful(false)
|
||||
.setIsActor(isActor)
|
||||
if (request.hasSupervisorUuid) replyBuilder.setSupervisorUuid(request.getSupervisorUuid)
|
||||
replyBuilder.build
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue