started working on ticket 194

This commit is contained in:
Michael Kober 2010-09-06 10:15:44 +02:00
parent 2478e5d6ad
commit 60d4010421
8 changed files with 299 additions and 35 deletions

View file

@ -120,9 +120,12 @@ object RemoteServer {
}
}
private class RemoteActorSet {
private[RemoteServer] val actors = new ConcurrentHashMap[String, ActorRef]
private[RemoteServer] val typedActors = new ConcurrentHashMap[String, AnyRef]
// FIXME private
class RemoteActorSet {
//private[RemoteServer] val actors = new ConcurrentHashMap[String, ActorRef]
val actors = new ConcurrentHashMap[String, ActorRef]
//private[RemoteServer] val typedActors = new ConcurrentHashMap[String, AnyRef]
val typedActors = new ConcurrentHashMap[String, AnyRef]
}
private val guard = new ReadWriteGuard
@ -130,11 +133,14 @@ object RemoteServer {
private val remoteServers = Map[Address, RemoteServer]()
private[akka] def registerActor(address: InetSocketAddress, uuid: String, actor: ActorRef) = guard.withWriteGuard {
actorsFor(RemoteServer.Address(address.getHostName, address.getPort)).actors.put(uuid, actor)
// FIXME
//actorsFor(RemoteServer.Address(address.getHostName, address.getPort)).actors.put(uuid, actor)
val actors = actorsFor(RemoteServer.Address(address.getHostName, address.getPort)).actors
actors.put(uuid, actor)
}
private[akka] def registerTypedActor(address: InetSocketAddress, name: String, typedActor: AnyRef) = guard.withWriteGuard {
actorsFor(RemoteServer.Address(address.getHostName, address.getPort)).typedActors.put(name, typedActor)
private[akka] def registerTypedActor(address: InetSocketAddress, uuid: String, typedActor: TypedActor) = guard.withWriteGuard {
actorsFor(RemoteServer.Address(address.getHostName, address.getPort)).typedActors.put(uuid, typedActor)
}
private[akka] def getOrCreateServer(address: InetSocketAddress): RemoteServer = guard.withWriteGuard {
@ -159,7 +165,9 @@ object RemoteServer {
remoteServers.remove(Address(hostname, port))
}
private def actorsFor(remoteServerAddress: RemoteServer.Address): RemoteActorSet = {
// FIXME
def actorsFor(remoteServerAddress: RemoteServer.Address): RemoteActorSet = {
println("##### actorsFor SIZE=" + remoteActorSets.size)
remoteActorSets.getOrElseUpdate(remoteServerAddress,new RemoteActorSet)
}
}
@ -195,7 +203,7 @@ class RemoteServer extends Logging with ListenerManagement {
private[akka] var hostname = RemoteServer.HOSTNAME
private[akka] var port = RemoteServer.PORT
@volatile private var _isRunning = false
private val factory = new NioServerSocketChannelFactory(
@ -270,7 +278,22 @@ class RemoteServer extends Logging with ListenerManagement {
}
}
// TODO: register typed actor in RemoteServer as well
// FIXME: register typed actor in RemoteServer as well
def registerTypedActor(id: String, actorRef: AnyRef): Unit = synchronized {
val typedActors = RemoteServer.actorsFor(RemoteServer.Address(hostname, port)).typedActors
if (!typedActors.contains(id)) {
log.debug("Registering server side remote actor [%s] with id [%s] on [%s:%d]", actorRef.getClass.getName, id, hostname, port)
typedActors.put(id, actorRef)
}
}
private def actors() : ConcurrentHashMap[String, ActorRef] = {
RemoteServer.actorsFor(RemoteServer.Address(hostname, port)).actors
}
private def typedActors() : ConcurrentHashMap[String, AnyRef] = {
RemoteServer.actorsFor(RemoteServer.Address(hostname, port)).typedActors
}
/**
* Register Remote Actor by the Actor's 'id' field. It starts the Actor if it is not started already.
@ -285,6 +308,7 @@ class RemoteServer extends Logging with ListenerManagement {
def register(id: String, actorRef: ActorRef): Unit = synchronized {
if (_isRunning) {
val actors = RemoteServer.actorsFor(RemoteServer.Address(hostname, port)).actors
println("___ register ___ " + actors.hashCode + " hostname=" + hostname + " port="+ port)
if (!actors.contains(id)) {
if (!actorRef.isRunning) actorRef.start
log.debug("Registering server side remote actor [%s] with id [%s]", actorRef.actorClass.getName, id)
@ -320,6 +344,8 @@ class RemoteServer extends Logging with ListenerManagement {
}
}
//FIXME: unregister typed Actor
protected override def manageLifeCycleOfListeners = false
protected[akka] override def foreachListener(f: (ActorRef) => Unit): Unit = super.foreachListener(f)