Merge branch 'ticket194'

This commit is contained in:
Michael Kober 2010-09-13 18:42:17 +02:00
commit 8d96c42c56
34 changed files with 1078 additions and 421 deletions

View file

@ -10,7 +10,7 @@ import java.util.concurrent.{ConcurrentHashMap, Executors}
import java.util.{Map => JMap}
import se.scalablesolutions.akka.actor.{
Actor, TypedActor, ActorRef, LocalActorRef, RemoteActorRef, IllegalActorStateException, RemoteActorSystemMessage}
Actor, TypedActor, ActorRef, IllegalActorStateException, RemoteActorSystemMessage}
import se.scalablesolutions.akka.actor.Actor._
import se.scalablesolutions.akka.util._
import se.scalablesolutions.akka.remote.protocol.RemoteProtocol._
@ -133,8 +133,8 @@ object RemoteServer {
actorsFor(RemoteServer.Address(address.getHostName, address.getPort)).actors.put(uuid, actor)
}
private[akka] def registerTypedActor(address: InetSocketAddress, name: String, typedActor: AnyRef) = guard.withWriteGuard {
actorsFor(RemoteServer.Address(address.getHostName, address.getPort)).typedActors.put(name, typedActor)
private[akka] def registerTypedActor(address: InetSocketAddress, uuid: String, typedActor: AnyRef) = guard.withWriteGuard {
actorsFor(RemoteServer.Address(address.getHostName, address.getPort)).typedActors.put(uuid, typedActor)
}
private[akka] def getOrCreateServer(address: InetSocketAddress): RemoteServer = guard.withWriteGuard {
@ -271,12 +271,28 @@ class RemoteServer extends Logging with ListenerManagement {
}
}
// TODO: register typed actor in RemoteServer as well
/**
* Register typed actor by interface name.
*/
def registerTypedActor(intfClass: Class[_], typedActor: AnyRef) : Unit = registerTypedActor(intfClass.getName, typedActor)
/**
* Register Remote Actor by the Actor's 'uuid' field. It starts the Actor if it is not started already.
* Register remote typed actor by a specific id.
* @param id custom actor id
* @param typedActor typed actor to register
*/
def register(actorRef: ActorRef): Unit = register(actorRef.id,actorRef)
def registerTypedActor(id: String, typedActor: AnyRef): Unit = synchronized {
val typedActors = RemoteServer.actorsFor(RemoteServer.Address(hostname, port)).typedActors
if (!typedActors.contains(id)) {
log.debug("Registering server side remote actor [%s] with id [%s] on [%s:%d]", typedActor.getClass.getName, id, hostname, port)
typedActors.put(id, typedActor)
}
}
/**
* Register Remote Actor by the Actor's 'id' field. It starts the Actor if it is not started already.
*/
def register(actorRef: ActorRef): Unit = register(actorRef.id, actorRef)
/**
* Register Remote Actor by a specific 'id' passed as argument.
@ -321,16 +337,25 @@ class RemoteServer extends Logging with ListenerManagement {
}
}
/**
* Unregister Remote Typed Actor by specific 'id'.
* <p/>
* NOTE: You need to call this method if you have registered an actor by a custom ID.
*/
def unregisterTypedActor(id: String):Unit = synchronized {
if (_isRunning) {
log.info("Unregistering server side remote typed actor with id [%s]", id)
val registeredTypedActors = typedActors()
registeredTypedActors.remove(id)
}
}
protected override def manageLifeCycleOfListeners = false
protected[akka] override def notifyListeners(message: => Any): Unit = super.notifyListeners(message)
private[akka] def actors() : ConcurrentHashMap[String, ActorRef] = {
RemoteServer.actorsFor(address).actors
}
private[akka] def typedActors() : ConcurrentHashMap[String, AnyRef] = {
RemoteServer.actorsFor(address).typedActors
}
private[akka] def actors() = RemoteServer.actorsFor(address).actors
private[akka] def typedActors() = RemoteServer.actorsFor(address).typedActors
}
object RemoteServerSslContext {
@ -530,6 +555,32 @@ class RemoteServerHandler(
}
}
/**
* Find a registered actor by ID (default) or UUID.
* Actors are registered by id apart from registering during serialization see SerializationProtocol.
*/
private def findActorByIdOrUuid(id: String, uuid: String) : ActorRef = {
val registeredActors = server.actors()
var actorRefOrNull = registeredActors get id
if (actorRefOrNull eq null) {
actorRefOrNull = registeredActors get uuid
}
actorRefOrNull
}
/**
* Find a registered typed actor by ID (default) or UUID.
* Actors are registered by id apart from registering during serialization see SerializationProtocol.
*/
private def findTypedActorByIdOrUUid(id: String, uuid: String) : AnyRef = {
val registeredActors = server.typedActors()
var actorRefOrNull = registeredActors get id
if (actorRefOrNull eq null) {
actorRefOrNull = registeredActors get uuid
}
actorRefOrNull
}
/**
* Creates a new instance of the actor with name, uuid and timeout specified as arguments.
*
@ -538,12 +589,14 @@ class RemoteServerHandler(
* Does not start the actor.
*/
private def createActor(actorInfo: ActorInfoProtocol): ActorRef = {
val uuid = actorInfo.getUuid
val ids = actorInfo.getUuid.split(':')
val uuid = ids(0)
val id = ids(1)
val name = actorInfo.getTarget
val timeout = actorInfo.getTimeout
val registeredActors = server.actors()
val actorRefOrNull = registeredActors get uuid
val actorRefOrNull = findActorByIdOrUuid(id, uuid)
if (actorRefOrNull eq null) {
try {
@ -552,9 +605,10 @@ class RemoteServerHandler(
else Class.forName(name)
val actorRef = Actor.actorOf(clazz.newInstance.asInstanceOf[Actor])
actorRef.uuid = uuid
actorRef.id = id
actorRef.timeout = timeout
actorRef.remoteAddress = None
registeredActors.put(uuid, actorRef)
server.actors.put(id, actorRef) // register by id
actorRef
} catch {
case e =>
@ -566,9 +620,11 @@ class RemoteServerHandler(
}
private def createTypedActor(actorInfo: ActorInfoProtocol): AnyRef = {
val uuid = actorInfo.getUuid
val registeredTypedActors = server.typedActors()
val typedActorOrNull = registeredTypedActors get uuid
val ids = actorInfo.getUuid.split(':')
val uuid = ids(0)
val id = ids(1)
val typedActorOrNull = findTypedActorByIdOrUUid(id, uuid)
if (typedActorOrNull eq null) {
val typedActorInfo = actorInfo.getTypedActorInfo
@ -585,7 +641,7 @@ class RemoteServerHandler(
val newInstance = TypedActor.newInstance(
interfaceClass, targetClass.asInstanceOf[Class[_ <: TypedActor]], actorInfo.getTimeout).asInstanceOf[AnyRef]
registeredTypedActors.put(uuid, newInstance)
server.typedActors.put(id, newInstance) // register by id
newInstance
} catch {
case e =>