Fixed issue with shutting down cluster correctly + Improved chat sample README
This commit is contained in:
parent
6b4bceec00
commit
28041ec21c
6 changed files with 55 additions and 27 deletions
|
|
@ -43,7 +43,7 @@ import org.jboss.netty.handler.codec.compression.{ZlibEncoder, ZlibDecoder}
|
|||
*
|
||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
*/
|
||||
object RemoteNode extends RemoteServer(true)
|
||||
object RemoteNode extends RemoteServer
|
||||
|
||||
/**
|
||||
* This object holds configuration variables.
|
||||
|
|
@ -88,7 +88,8 @@ object RemoteServer {
|
|||
}
|
||||
|
||||
private val remoteActorSets = new ConcurrentHashMap[Address, RemoteActorSet]
|
||||
|
||||
private val remoteServers = new ConcurrentHashMap[Address, RemoteServer]
|
||||
|
||||
def actorsFor(remoteServerAddress: RemoteServer.Address): RemoteActorSet = {
|
||||
val set = remoteActorSets.get(remoteServerAddress)
|
||||
if (set ne null) set
|
||||
|
|
@ -98,6 +99,20 @@ object RemoteServer {
|
|||
remoteActorSet
|
||||
}
|
||||
}
|
||||
|
||||
def serverFor(hostname: String, port: Int): Option[RemoteServer] = {
|
||||
val server = remoteServers.get(Address(hostname, port))
|
||||
if (server eq null) None
|
||||
else Some(server)
|
||||
}
|
||||
|
||||
private[remote] def register(hostname: String, port: Int, server: RemoteServer) =
|
||||
remoteServers.put(Address(hostname, port), server)
|
||||
|
||||
private[remote] def unregister(hostname: String, port: Int) =
|
||||
remoteServers.remove(Address(hostname, port))
|
||||
|
||||
private[remote] def canShutDownCluster: Boolean = remoteServers.isEmpty
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -116,9 +131,8 @@ object RemoteServer {
|
|||
*
|
||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
*/
|
||||
class RemoteServer(val registerNodeInCluster: Boolean) extends Logging {
|
||||
class RemoteServer extends Logging {
|
||||
val name = "RemoteServer@" + hostname + ":" + port
|
||||
def this() = this(false)
|
||||
|
||||
private var hostname = RemoteServer.HOSTNAME
|
||||
private var port = RemoteServer.PORT
|
||||
|
|
@ -147,6 +161,7 @@ class RemoteServer(val registerNodeInCluster: Boolean) extends Logging {
|
|||
hostname = _hostname
|
||||
port = _port
|
||||
log.info("Starting remote server at [%s:%s]", hostname, port)
|
||||
RemoteServer.register(hostname, port, this)
|
||||
val remoteActorSet = RemoteServer.actorsFor(RemoteServer.Address(hostname, port))
|
||||
val pipelineFactory = new RemoteServerPipelineFactory(name, openChannels, loader, remoteActorSet.actors, remoteActorSet.activeObjects)
|
||||
bootstrap.setPipelineFactory(pipelineFactory)
|
||||
|
|
@ -156,7 +171,7 @@ class RemoteServer(val registerNodeInCluster: Boolean) extends Logging {
|
|||
bootstrap.setOption("child.connectTimeoutMillis", RemoteServer.CONNECTION_TIMEOUT_MILLIS)
|
||||
openChannels.add(bootstrap.bind(new InetSocketAddress(hostname, port)))
|
||||
isRunning = true
|
||||
if (registerNodeInCluster) Cluster.registerLocalNode(hostname, port)
|
||||
Cluster.registerLocalNode(hostname, port)
|
||||
}
|
||||
} catch {
|
||||
case e => log.error(e, "Could not start up remote server")
|
||||
|
|
@ -164,14 +179,13 @@ class RemoteServer(val registerNodeInCluster: Boolean) extends Logging {
|
|||
}
|
||||
|
||||
def shutdown = {
|
||||
RemoteServer.unregister(hostname, port)
|
||||
openChannels.disconnect
|
||||
openChannels.unbind
|
||||
openChannels.close.awaitUninterruptibly(1000)
|
||||
bootstrap.releaseExternalResources
|
||||
if (registerNodeInCluster) {
|
||||
Cluster.deregisterLocalNode(hostname, port)
|
||||
Cluster.shutdown
|
||||
}
|
||||
Cluster.deregisterLocalNode(hostname, port)
|
||||
if (RemoteServer.canShutDownCluster) Cluster.shutdown
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue