Aaaaalmost there...
This commit is contained in:
parent
12aedd9fbe
commit
551f25aba8
24 changed files with 448 additions and 296 deletions
|
|
@ -11,6 +11,7 @@ import java.util.{Map => JMap}
|
|||
|
||||
import se.scalablesolutions.akka.actor.{
|
||||
Actor, TypedActor, ActorRef, IllegalActorStateException, RemoteActorSystemMessage}
|
||||
import se.scalablesolutions.akka.actor.{Uuid,uuidFrom}
|
||||
import se.scalablesolutions.akka.actor.Actor._
|
||||
import se.scalablesolutions.akka.util._
|
||||
import se.scalablesolutions.akka.remote.protocol.RemoteProtocol._
|
||||
|
|
@ -122,19 +123,19 @@ object RemoteServer {
|
|||
}
|
||||
|
||||
private class RemoteActorSet {
|
||||
private[RemoteServer] val actors = new ConcurrentHashMap[String, ActorRef]
|
||||
private[RemoteServer] val typedActors = new ConcurrentHashMap[String, AnyRef]
|
||||
private[RemoteServer] val actors = new ConcurrentHashMap[Object, ActorRef]
|
||||
private[RemoteServer] val typedActors = new ConcurrentHashMap[Object, AnyRef]
|
||||
}
|
||||
|
||||
private val guard = new ReadWriteGuard
|
||||
private val remoteActorSets = Map[Address, RemoteActorSet]()
|
||||
private val remoteServers = Map[Address, RemoteServer]()
|
||||
|
||||
private[akka] def registerActor(address: InetSocketAddress, uuid: String, actor: ActorRef) = guard.withWriteGuard {
|
||||
private[akka] def registerActor(address: InetSocketAddress, uuid: Uuid, actor: ActorRef) = guard.withWriteGuard {
|
||||
actorsFor(RemoteServer.Address(address.getHostName, address.getPort)).actors.put(uuid, actor)
|
||||
}
|
||||
|
||||
private[akka] def registerTypedActor(address: InetSocketAddress, uuid: String, typedActor: AnyRef) = guard.withWriteGuard {
|
||||
private[akka] def registerTypedActor(address: InetSocketAddress, uuid: Uuid, typedActor: AnyRef) = guard.withWriteGuard {
|
||||
actorsFor(RemoteServer.Address(address.getHostName, address.getPort)).typedActors.put(uuid, typedActor)
|
||||
}
|
||||
|
||||
|
|
@ -504,7 +505,7 @@ class RemoteServerHandler(
|
|||
override def onComplete(result: AnyRef) {
|
||||
log.debug("Returning result from actor invocation [%s]", result)
|
||||
val replyBuilder = RemoteReplyProtocol.newBuilder
|
||||
.setId(request.getId)
|
||||
.setUuid(request.getUuid)
|
||||
.setMessage(MessageSerializer.serialize(result))
|
||||
.setIsSuccessful(true)
|
||||
.setIsActor(true)
|
||||
|
|
@ -548,7 +549,7 @@ class RemoteServerHandler(
|
|||
val result = messageReceiver.invoke(typedActor, args: _*)
|
||||
log.debug("Returning result from remote typed actor invocation [%s]", result)
|
||||
val replyBuilder = RemoteReplyProtocol.newBuilder
|
||||
.setId(request.getId)
|
||||
.setUuid(request.getUuid)
|
||||
.setMessage(MessageSerializer.serialize(result))
|
||||
.setIsSuccessful(true)
|
||||
.setIsActor(false)
|
||||
|
|
@ -569,7 +570,7 @@ class RemoteServerHandler(
|
|||
* Find a registered actor by ID (default) or UUID.
|
||||
* Actors are registered by id apart from registering during serialization see SerializationProtocol.
|
||||
*/
|
||||
private def findActorByIdOrUuid(id: String, uuid: String) : ActorRef = {
|
||||
private def findActorByIdOrUuid(id: String, uuid: Uuid) : ActorRef = {
|
||||
val registeredActors = server.actors()
|
||||
var actorRefOrNull = registeredActors get id
|
||||
if (actorRefOrNull eq null) {
|
||||
|
|
@ -582,7 +583,7 @@ class RemoteServerHandler(
|
|||
* Find a registered typed actor by ID (default) or UUID.
|
||||
* Actors are registered by id apart from registering during serialization see SerializationProtocol.
|
||||
*/
|
||||
private def findTypedActorByIdOrUUid(id: String, uuid: String) : AnyRef = {
|
||||
private def findTypedActorByIdOrUUid(id: String, uuid: Uuid) : AnyRef = {
|
||||
val registeredActors = server.typedActors()
|
||||
var actorRefOrNull = registeredActors get id
|
||||
if (actorRefOrNull eq null) {
|
||||
|
|
@ -599,7 +600,7 @@ class RemoteServerHandler(
|
|||
* Does not start the actor.
|
||||
*/
|
||||
private def createActor(actorInfo: ActorInfoProtocol): ActorRef = {
|
||||
val uuid = actorInfo.getUuid
|
||||
val uuid = uuidFrom(actorInfo.getUuid.getHigh,actorInfo.getUuid.getLow)
|
||||
val id = actorInfo.getId
|
||||
|
||||
val name = actorInfo.getTarget
|
||||
|
|
@ -629,7 +630,7 @@ class RemoteServerHandler(
|
|||
}
|
||||
|
||||
private def createTypedActor(actorInfo: ActorInfoProtocol): AnyRef = {
|
||||
val uuid = actorInfo.getUuid
|
||||
val uuid = uuidFrom(actorInfo.getUuid.getHigh,actorInfo.getUuid.getLow)
|
||||
val id = actorInfo.getId
|
||||
|
||||
val typedActorOrNull = findTypedActorByIdOrUUid(id, uuid)
|
||||
|
|
@ -664,7 +665,7 @@ class RemoteServerHandler(
|
|||
val actorInfo = request.getActorInfo
|
||||
log.error(e, "Could not invoke remote typed actor [%s :: %s]", actorInfo.getTypedActorInfo.getMethod, actorInfo.getTarget)
|
||||
val replyBuilder = RemoteReplyProtocol.newBuilder
|
||||
.setId(request.getId)
|
||||
.setUuid(request.getUuid)
|
||||
.setException(ExceptionProtocol.newBuilder.setClassname(e.getClass.getName).setMessage(e.getMessage).build)
|
||||
.setIsSuccessful(false)
|
||||
.setIsActor(isActor)
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue