First shot at re-doing akka-remote

This commit is contained in:
Viktor Klang 2010-12-14 18:22:46 +01:00
parent a1117c6935
commit c89ea0a49c
15 changed files with 321 additions and 386 deletions

View file

@ -32,6 +32,7 @@ import scala.collection.mutable.Map
import scala.reflect.BeanProperty
import akka.dispatch. {Future, DefaultCompletableFuture, CompletableFuture}
import akka.japi.Creator
import akka.remoteinterface.RemoteServerModule
/**
* Use this object if you need a single remote server on a specific node.
@ -112,45 +113,6 @@ object RemoteServer {
true
} else */false
}
private val guard = new ReadWriteGuard
private val remoteServers = Map[Address, RemoteServer]()
def serverFor(address: InetSocketAddress): Option[RemoteServer] =
serverFor(address.getHostName, address.getPort)
def serverFor(hostname: String, port: Int): Option[RemoteServer] = guard.withReadGuard {
remoteServers.get(Address(hostname, port))
}
private[akka] def getOrCreateServer(address: InetSocketAddress): RemoteServer = guard.withWriteGuard {
serverFor(address) match {
case Some(server) => server
case None => (new RemoteServer).start(address)
}
}
private[akka] def register(hostname: String, port: Int, server: RemoteServer) = guard.withWriteGuard {
remoteServers.put(Address(hostname, port), server)
}
private[akka] def unregister(hostname: String, port: Int) = guard.withWriteGuard {
remoteServers.remove(Address(hostname, port))
}
/**
* Used in REflectiveAccess
*/
private[akka] def registerActor(address: InetSocketAddress, actorRef: ActorRef) {
serverFor(address) foreach { _.register(actorRef) }
}
/**
* Used in Reflective
*/
private[akka] def registerTypedActor(address: InetSocketAddress, implementationClassName: String, proxy: AnyRef) {
serverFor(address) foreach { _.registerTypedActor(implementationClassName,proxy)}
}
}
/**
@ -190,52 +152,31 @@ case class RemoteServerClientClosed(
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class RemoteServer extends Logging with ListenerManagement {
class RemoteServer extends RemoteServerModule {
import RemoteServer._
def name = "RemoteServer@" + hostname + ":" + port
private[akka] var address = Address(RemoteServer.HOSTNAME,RemoteServer.PORT)
@volatile private[akka] var address = Address(RemoteServer.HOSTNAME,RemoteServer.PORT)
def hostname = address.hostname
def port = address.port
def name = "RemoteServer@" + hostname + ":" + port
@volatile private var _isRunning = false
private val _isRunning = new Switch(false)
private val factory = new NioServerSocketChannelFactory(
Executors.newCachedThreadPool,
Executors.newCachedThreadPool)
private val factory = new NioServerSocketChannelFactory(Executors.newCachedThreadPool,Executors.newCachedThreadPool)
private val bootstrap = new ServerBootstrap(factory)
// group of open channels, used for clean-up
private val openChannels: ChannelGroup = new DefaultChannelGroup("akka-remote-server")
def isRunning = _isRunning
def isRunning = _isRunning.isOn
def start: RemoteServer =
start(hostname, port, None)
def start(loader: ClassLoader): RemoteServer =
start(hostname, port, Some(loader))
def start(address: InetSocketAddress): RemoteServer =
start(address.getHostName, address.getPort, None)
def start(address: InetSocketAddress, loader: ClassLoader): RemoteServer =
start(address.getHostName, address.getPort, Some(loader))
def start(_hostname: String, _port: Int): RemoteServer =
start(_hostname, _port, None)
private def start(_hostname: String, _port: Int, loader: ClassLoader): RemoteServer =
start(_hostname, _port, Some(loader))
private def start(_hostname: String, _port: Int, loader: Option[ClassLoader]): RemoteServer = synchronized {
def start(_hostname: String, _port: Int, loader: Option[ClassLoader] = None): RemoteServer = guard withGuard {
try {
if (!_isRunning) {
_isRunning switchOn {
address = Address(_hostname,_port)
log.slf4j.info("Starting remote server at [{}:{}]", hostname, port)
RemoteServer.register(hostname, port, this)
val pipelineFactory = new RemoteServerPipelineFactory(name, openChannels, loader, this)
bootstrap.setPipelineFactory(pipelineFactory)
@ -245,7 +186,6 @@ class RemoteServer extends Logging with ListenerManagement {
bootstrap.setOption("child.connectTimeoutMillis", RemoteServer.CONNECTION_TIMEOUT_MILLIS.toMillis)
openChannels.add(bootstrap.bind(new InetSocketAddress(hostname, port)))
_isRunning = true
notifyListeners(RemoteServerStarted(this))
}
} catch {
@ -256,10 +196,9 @@ class RemoteServer extends Logging with ListenerManagement {
this
}
def shutdown = synchronized {
if (_isRunning) {
def shutdown = guard withGuard {
_isRunning switchOff {
try {
RemoteServer.unregister(hostname, port)
openChannels.disconnect
openChannels.close.awaitUninterruptibly
bootstrap.releaseExternalResources
@ -271,70 +210,49 @@ class RemoteServer extends Logging with ListenerManagement {
}
}
/**
* Register typed actor by interface name.
*/
def registerTypedActor(intfClass: Class[_], typedActor: AnyRef) : Unit = registerTypedActor(intfClass.getName, typedActor)
/**
* Register remote typed actor by a specific id.
* @param id custom actor id
* @param typedActor typed actor to register
*/
def registerTypedActor(id: String, typedActor: AnyRef): Unit = synchronized {
def registerTypedActor(id: String, typedActor: AnyRef): Unit = guard withGuard {
log.slf4j.debug("Registering server side remote typed actor [{}] with id [{}]", typedActor.getClass.getName, id)
if (id.startsWith(UUID_PREFIX)) registerTypedActor(id.substring(UUID_PREFIX.length), typedActor, typedActorsByUuid)
else registerTypedActor(id, typedActor, typedActors)
}
/**
* Register typed actor by interface name.
*/
def registerTypedPerSessionActor(intfClass: Class[_], factory: => AnyRef) : Unit = registerTypedActor(intfClass.getName, factory)
/**
* Register typed actor by interface name.
* Java API
*/
def registerTypedPerSessionActor(intfClass: Class[_], factory: Creator[AnyRef]) : Unit = registerTypedActor(intfClass.getName, factory)
/**
* Register remote typed actor by a specific id.
* @param id custom actor id
* @param typedActor typed actor to register
*/
def registerTypedPerSessionActor(id: String, factory: => AnyRef): Unit = synchronized {
def registerTypedPerSessionActor(id: String, factory: => AnyRef): Unit = guard withGuard {
log.slf4j.debug("Registering server side typed remote session actor with id [{}]", id)
registerTypedPerSessionActor(id, () => factory, typedActorsFactories)
}
/**
* Register remote typed actor by a specific id.
* @param id custom actor id
* @param typedActor typed actor to register
* Java API
*/
def registerTypedPerSessionActor(id: String, factory: Creator[AnyRef]): Unit = synchronized {
log.slf4j.debug("Registering server side typed remote session actor with id [{}]", id)
registerTypedPerSessionActor(id, factory.create _, typedActorsFactories)
}
/**
* Register Remote Actor by the Actor's 'id' field. It starts the Actor if it is not started already.
*/
def register(actorRef: ActorRef): Unit = register(actorRef.id, actorRef)
/**
* Register Remote Actor by a specific 'id' passed as argument.
* <p/>
* NOTE: If you use this method to register your remote actor then you must unregister the actor by this ID yourself.
*/
def register(id: String, actorRef: ActorRef): Unit = synchronized {
def register(id: String, actorRef: ActorRef): Unit = guard withGuard {
log.slf4j.debug("Registering server side remote actor [{}] with id [{}]", actorRef.actorClass.getName, id)
if (id.startsWith(UUID_PREFIX)) register(id.substring(UUID_PREFIX.length), actorRef, actorsByUuid)
else register(id, actorRef, actors)
}
def registerByUuid(actorRef: ActorRef): Unit = guard withGuard {
register(actorRef.uuid.toString, actorRef, actorsByUuid)
}
private def register[Key](id: Key, actorRef: ActorRef, registry: ConcurrentHashMap[Key, ActorRef]) {
if (_isRunning.isOn) {
registry.put(id, actorRef) //TODO change to putIfAbsent
if (!actorRef.isRunning) actorRef.start
}
}
/**
* Register Remote Session Actor by a specific 'id' passed as argument.
* <p/>
@ -345,44 +263,26 @@ class RemoteServer extends Logging with ListenerManagement {
registerPerSession(id, () => factory, actorsFactories)
}
/**
* Register Remote Session Actor by a specific 'id' passed as argument.
* <p/>
* NOTE: If you use this method to register your remote actor then you must unregister the actor by this ID yourself.
* Java API
*/
def registerPerSession(id: String, factory: Creator[ActorRef]): Unit = synchronized {
log.slf4j.debug("Registering server side remote session actor with id [{}]", id)
registerPerSession(id, factory.create _, actorsFactories)
}
private def register[Key](id: Key, actorRef: ActorRef, registry: ConcurrentHashMap[Key, ActorRef]) {
if (_isRunning) {
registry.put(id, actorRef) //TODO change to putIfAbsent
if (!actorRef.isRunning) actorRef.start
}
}
private def registerPerSession[Key](id: Key, factory: () => ActorRef, registry: ConcurrentHashMap[Key,() => ActorRef]) {
if (_isRunning)
if (_isRunning.isOn)
registry.put(id, factory) //TODO change to putIfAbsent
}
private def registerTypedActor[Key](id: Key, typedActor: AnyRef, registry: ConcurrentHashMap[Key, AnyRef]) {
if (_isRunning)
if (_isRunning.isOn)
registry.put(id, typedActor) //TODO change to putIfAbsent
}
private def registerTypedPerSessionActor[Key](id: Key, factory: () => AnyRef, registry: ConcurrentHashMap[Key,() => AnyRef]) {
if (_isRunning)
if (_isRunning.isOn)
registry.put(id, factory) //TODO change to putIfAbsent
}
/**
* Unregister Remote Actor that is registered using its 'id' field (not custom ID).
*/
def unregister(actorRef: ActorRef):Unit = synchronized {
if (_isRunning) {
def unregister(actorRef: ActorRef): Unit = guard withGuard {
if (_isRunning.isOn) {
log.slf4j.debug("Unregistering server side remote actor [{}] with id [{}:{}]", Array[AnyRef](actorRef.actorClass.getName, actorRef.id, actorRef.uuid))
actors.remove(actorRef.id, actorRef)
actorsByUuid.remove(actorRef.uuid, actorRef)
@ -394,8 +294,8 @@ class RemoteServer extends Logging with ListenerManagement {
* <p/>
* NOTE: You need to call this method if you have registered an actor by a custom ID.
*/
def unregister(id: String):Unit = synchronized {
if (_isRunning) {
def unregister(id: String): Unit = guard withGuard {
if (_isRunning.isOn) {
log.slf4j.info("Unregistering server side remote actor with id [{}]", id)
if (id.startsWith(UUID_PREFIX)) actorsByUuid.remove(id.substring(UUID_PREFIX.length))
else {
@ -411,8 +311,8 @@ class RemoteServer extends Logging with ListenerManagement {
* <p/>
* NOTE: You need to call this method if you have registered an actor by a custom ID.
*/
def unregisterPerSession(id: String):Unit = {
if (_isRunning) {
def unregisterPerSession(id: String): Unit = {
if (_isRunning.isOn) {
log.slf4j.info("Unregistering server side remote session actor with id [{}]", id)
actorsFactories.remove(id)
}
@ -423,8 +323,8 @@ class RemoteServer extends Logging with ListenerManagement {
* <p/>
* NOTE: You need to call this method if you have registered an actor by a custom ID.
*/
def unregisterTypedActor(id: String):Unit = synchronized {
if (_isRunning) {
def unregisterTypedActor(id: String):Unit = guard withGuard {
if (_isRunning.isOn) {
log.slf4j.info("Unregistering server side remote typed actor with id [{}]", id)
if (id.startsWith(UUID_PREFIX)) typedActorsByUuid.remove(id.substring(UUID_PREFIX.length))
else typedActors.remove(id)
@ -436,8 +336,8 @@ class RemoteServer extends Logging with ListenerManagement {
* <p/>
* NOTE: You need to call this method if you have registered an actor by a custom ID.
*/
def unregisterTypedPerSessionActor(id: String):Unit = {
if (_isRunning) {
def unregisterTypedPerSessionActor(id: String): Unit = {
if (_isRunning.isOn) {
typedActorsFactories.remove(id)
}
}
@ -446,7 +346,6 @@ class RemoteServer extends Logging with ListenerManagement {
protected[akka] override def notifyListeners(message: => Any): Unit = super.notifyListeners(message)
private[akka] def actors = ActorRegistry.actors(address)
private[akka] def actorsByUuid = ActorRegistry.actorsByUuid(address)
private[akka] def actorsFactories = ActorRegistry.actorsFactories(address)
@ -838,7 +737,6 @@ class RemoteServerHandler(
actorRef.uuid = uuidFrom(uuid.getHigh,uuid.getLow)
actorRef.id = id
actorRef.timeout = timeout
actorRef.remoteAddress = None
server.actorsByUuid.put(actorRef.uuid.toString, actorRef) // register by uuid
actorRef
} catch {