Now doing a 'reply(..)' to remote sender after receiving a remote message through '!' works. Added tests.
Also removed the Logging trait from Actor for lower memory footprint.
This commit is contained in:
parent
94d472eac2
commit
91fe6a3a98
11 changed files with 191 additions and 147 deletions
|
|
@ -33,7 +33,7 @@ import org.jboss.netty.handler.codec.compression.{ZlibEncoder, ZlibDecoder}
|
|||
* <pre>
|
||||
* RemoteNode.start(hostname, port)
|
||||
* </pre>
|
||||
*
|
||||
*
|
||||
* You can specify the class loader to use to load the remote actors.
|
||||
* <pre>
|
||||
* RemoteNode.start(hostname, port, classLoader)
|
||||
|
|
@ -87,15 +87,15 @@ object RemoteServer {
|
|||
that.asInstanceOf[Address].port == port
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
class RemoteActorSet {
|
||||
val actors = new ConcurrentHashMap[String, Actor]
|
||||
val activeObjects = new ConcurrentHashMap[String, AnyRef]
|
||||
val activeObjects = new ConcurrentHashMap[String, AnyRef]
|
||||
}
|
||||
|
||||
private val remoteActorSets = new ConcurrentHashMap[Address, RemoteActorSet]
|
||||
private val remoteServers = new ConcurrentHashMap[Address, RemoteServer]
|
||||
|
||||
|
||||
private[akka] def actorsFor(remoteServerAddress: RemoteServer.Address): RemoteActorSet = {
|
||||
val set = remoteActorSets.get(remoteServerAddress)
|
||||
if (set ne null) set
|
||||
|
|
@ -106,7 +106,7 @@ object RemoteServer {
|
|||
}
|
||||
}
|
||||
|
||||
private[remote] def serverFor(hostname: String, port: Int): Option[RemoteServer] = {
|
||||
private[akka] def serverFor(hostname: String, port: Int): Option[RemoteServer] = {
|
||||
val server = remoteServers.get(Address(hostname, port))
|
||||
if (server eq null) None
|
||||
else Some(server)
|
||||
|
|
@ -114,7 +114,7 @@ object RemoteServer {
|
|||
|
||||
private[remote] def register(hostname: String, port: Int, server: RemoteServer) =
|
||||
remoteServers.put(Address(hostname, port), server)
|
||||
|
||||
|
||||
private[remote] def unregister(hostname: String, port: Int) =
|
||||
remoteServers.remove(Address(hostname, port))
|
||||
}
|
||||
|
|
@ -141,8 +141,7 @@ class RemoteServer extends Logging {
|
|||
private var hostname = RemoteServer.HOSTNAME
|
||||
private var port = RemoteServer.PORT
|
||||
|
||||
@volatile private var isRunning = false
|
||||
@volatile private var isConfigured = false
|
||||
@volatile private var _isRunning = false
|
||||
|
||||
private val factory = new NioServerSocketChannelFactory(
|
||||
Executors.newCachedThreadPool,
|
||||
|
|
@ -153,6 +152,8 @@ class RemoteServer extends Logging {
|
|||
// group of open channels, used for clean-up
|
||||
private val openChannels: ChannelGroup = new DefaultChannelGroup("akka-remote-server")
|
||||
|
||||
def isRunning = _isRunning
|
||||
|
||||
def start: Unit = start(None)
|
||||
|
||||
def start(loader: Option[ClassLoader]): Unit = start(hostname, port, loader)
|
||||
|
|
@ -161,7 +162,7 @@ class RemoteServer extends Logging {
|
|||
|
||||
def start(_hostname: String, _port: Int, loader: Option[ClassLoader]): Unit = synchronized {
|
||||
try {
|
||||
if (!isRunning) {
|
||||
if (!_isRunning) {
|
||||
hostname = _hostname
|
||||
port = _port
|
||||
log.info("Starting remote server at [%s:%s]", hostname, port)
|
||||
|
|
@ -174,20 +175,22 @@ class RemoteServer extends Logging {
|
|||
bootstrap.setOption("child.reuseAddress", true)
|
||||
bootstrap.setOption("child.connectTimeoutMillis", RemoteServer.CONNECTION_TIMEOUT_MILLIS)
|
||||
openChannels.add(bootstrap.bind(new InetSocketAddress(hostname, port)))
|
||||
isRunning = true
|
||||
_isRunning = true
|
||||
Cluster.registerLocalNode(hostname, port)
|
||||
}
|
||||
}
|
||||
} catch {
|
||||
case e => log.error(e, "Could not start up remote server")
|
||||
}
|
||||
}
|
||||
|
||||
def shutdown = if (isRunning) {
|
||||
RemoteServer.unregister(hostname, port)
|
||||
openChannels.disconnect
|
||||
openChannels.close.awaitUninterruptibly
|
||||
bootstrap.releaseExternalResources
|
||||
Cluster.deregisterLocalNode(hostname, port)
|
||||
def shutdown = synchronized {
|
||||
if (_isRunning) {
|
||||
RemoteServer.unregister(hostname, port)
|
||||
openChannels.disconnect
|
||||
openChannels.close.awaitUninterruptibly
|
||||
bootstrap.releaseExternalResources
|
||||
Cluster.deregisterLocalNode(hostname, port)
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: register active object in RemoteServer as well
|
||||
|
|
@ -195,17 +198,21 @@ class RemoteServer extends Logging {
|
|||
/**
|
||||
* Register Remote Actor by the Actor's 'id' field.
|
||||
*/
|
||||
def register(actor: Actor) = if (isRunning) {
|
||||
log.info("Registering server side remote actor [%s] with id [%s]", actor.getClass.getName, actor.getId)
|
||||
RemoteServer.actorsFor(RemoteServer.Address(hostname, port)).actors.put(actor.getId, actor)
|
||||
def register(actor: Actor) = synchronized {
|
||||
if (_isRunning) {
|
||||
log.info("Registering server side remote actor [%s] with id [%s]", actor.getClass.getName, actor.getId)
|
||||
RemoteServer.actorsFor(RemoteServer.Address(hostname, port)).actors.put(actor.getId, actor)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Register Remote Actor by a specific 'id' passed as argument.
|
||||
* Register Remote Actor by a specific 'id' passed as argument.
|
||||
*/
|
||||
def register(id: String, actor: Actor) = if (isRunning) {
|
||||
log.info("Registering server side remote actor [%s] with id [%s]", actor.getClass.getName, id)
|
||||
RemoteServer.actorsFor(RemoteServer.Address(hostname, port)).actors.put(id, actor)
|
||||
def register(id: String, actor: Actor) = synchronized {
|
||||
if (_isRunning) {
|
||||
log.info("Registering server side remote actor [%s] with id [%s]", actor.getClass.getName, id)
|
||||
RemoteServer.actorsFor(RemoteServer.Address(hostname, port)).actors.put(id, actor)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -232,9 +239,9 @@ class RemoteServerPipelineFactory(
|
|||
//case "lzf" => Some(Codec(new LzfEncoder, new LzfDecoder))
|
||||
case _ => None
|
||||
}
|
||||
val remoteServer = new RemoteServerHandler(name, openChannels, loader, actors, activeObjects)
|
||||
val remoteServer = new RemoteServerHandler(name, openChannels, loader, actors, activeObjects)
|
||||
|
||||
val stages: Array[ChannelHandler] =
|
||||
val stages: Array[ChannelHandler] =
|
||||
zipCodec.map(codec => Array(codec.decoder, lenDec, protobufDec, codec.encoder, lenPrep, protobufEnc, remoteServer))
|
||||
.getOrElse(Array(lenDec, protobufDec, lenPrep, protobufEnc, remoteServer))
|
||||
new StaticChannelPipeline(stages: _*)
|
||||
|
|
@ -294,7 +301,8 @@ class RemoteServerHandler(
|
|||
private def dispatchToActor(request: RemoteRequest, channel: Channel) = {
|
||||
log.debug("Dispatching to remote actor [%s]", request.getTarget)
|
||||
val actor = createActor(request.getTarget, request.getUuid, request.getTimeout)
|
||||
|
||||
actor.start
|
||||
|
||||
val message = RemoteProtocolBuilder.getMessage(request)
|
||||
if (request.getIsOneWay) {
|
||||
if (request.hasSourceHostname && request.hasSourcePort) {
|
||||
|
|
@ -389,19 +397,6 @@ class RemoteServerHandler(
|
|||
}
|
||||
}
|
||||
|
||||
/*
|
||||
private def continueTransaction(request: RemoteRequest) = {
|
||||
val tx = request.tx
|
||||
if (tx.isDefined) {
|
||||
tx.get.reinit
|
||||
TransactionManagement.threadBoundTx.set(tx)
|
||||
setThreadLocalTransaction(tx.transaction)
|
||||
} else {
|
||||
TransactionManagement.threadBoundTx.set(None)
|
||||
setThreadLocalTransaction(null)
|
||||
}
|
||||
}
|
||||
*/
|
||||
private def unescapeArgs(args: scala.List[AnyRef], argClasses: scala.List[Class[_]], timeout: Long) = {
|
||||
val unescapedArgs = new Array[AnyRef](args.size)
|
||||
val unescapedArgClasses = new Array[Class[_]](args.size)
|
||||
|
|
@ -410,7 +405,7 @@ class RemoteServerHandler(
|
|||
val arg = args(i)
|
||||
if (arg.isInstanceOf[String] && arg.asInstanceOf[String].startsWith(AW_PROXY_PREFIX)) {
|
||||
val argString = arg.asInstanceOf[String]
|
||||
val proxyName = argString.replace(AW_PROXY_PREFIX, "") //argString.substring(argString.indexOf("$$ProxiedByAW"), argString.length)
|
||||
val proxyName = argString.replace(AW_PROXY_PREFIX, "")
|
||||
val activeObject = createActiveObject(proxyName, timeout)
|
||||
unescapedArgs(i) = activeObject
|
||||
unescapedArgClasses(i) = Class.forName(proxyName)
|
||||
|
|
@ -440,6 +435,11 @@ class RemoteServerHandler(
|
|||
} else activeObjectOrNull
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a new instance of the actor with name, uuid and timeout specified as arguments.
|
||||
* If actor already created then just return it from the registry.
|
||||
* Does not start the actor.
|
||||
*/
|
||||
private def createActor(name: String, uuid: String, timeout: Long): Actor = {
|
||||
val actorOrNull = actors.get(uuid)
|
||||
if (actorOrNull eq null) {
|
||||
|
|
@ -452,7 +452,6 @@ class RemoteServerHandler(
|
|||
newInstance.timeout = timeout
|
||||
newInstance._remoteAddress = None
|
||||
actors.put(uuid, newInstance)
|
||||
newInstance.start
|
||||
newInstance
|
||||
} catch {
|
||||
case e =>
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue