Added registration of remote actors in declarative supervisor config + Fixed bug in remote client reconnect + Added Redis as backend for Chat sample + Added UUID utility + Misc minor other fixes

This commit is contained in:
Jonas Bonér 2009-12-30 08:36:24 +01:00
parent 90f7e0ea4e
commit 0567a5780e
18 changed files with 157 additions and 88 deletions

View file

@ -7,6 +7,7 @@ package se.scalablesolutions.akka.remote
import java.lang.reflect.InvocationTargetException
import java.net.InetSocketAddress
import java.util.concurrent.{ConcurrentHashMap, Executors}
import java.util.{Map => JMap}
import se.scalablesolutions.akka.actor._
import se.scalablesolutions.akka.util._
@ -62,6 +63,41 @@ object RemoteServer {
"zlib compression level has to be within 1-9, with 1 being fastest and 9 being the most compressed")
level
}
object Address {
def apply(hostname: String, port: Int) = new Address(hostname, port)
}
class Address(val hostname: String, val port: Int) {
override def hashCode: Int = {
var result = HashCode.SEED
result = HashCode.hash(result, hostname)
result = HashCode.hash(result, port)
result
}
override def equals(that: Any): Boolean = {
that != null &&
that.isInstanceOf[Address] &&
that.asInstanceOf[Address].hostname == hostname &&
that.asInstanceOf[Address].port == port
}
}
class RemoteActorSet {
val actors = new ConcurrentHashMap[String, Actor]
val activeObjects = new ConcurrentHashMap[String, AnyRef]
}
private val remoteActorSets = new ConcurrentHashMap[Address, RemoteActorSet]
def actorsFor(remoteServerAddress: RemoteServer.Address): RemoteActorSet = {
val set = remoteActorSets.get(remoteServerAddress)
if (set ne null) set
else {
val remoteActorSet = new RemoteActorSet
remoteActorSets.put(remoteServerAddress, remoteActorSet)
remoteActorSet
}
}
}
/**
@ -84,7 +120,7 @@ class RemoteServer extends Logging {
val name = "RemoteServer@" + hostname + ":" + port
private var hostname = RemoteServer.HOSTNAME
private var port = RemoteServer.PORT
private var port = RemoteServer.PORT
@volatile private var isRunning = false
@volatile private var isConfigured = false
@ -96,7 +132,7 @@ class RemoteServer extends Logging {
private val bootstrap = new ServerBootstrap(factory)
// group of open channels, used for clean-up
private val openChannels: ChannelGroup = new DefaultChannelGroup("akka-server")
private val openChannels: ChannelGroup = new DefaultChannelGroup("akka-remote-server")
def start: Unit = start(None)
@ -110,7 +146,12 @@ class RemoteServer extends Logging {
hostname = _hostname
port = _port
log.info("Starting remote server at [%s:%s]", hostname, port)
bootstrap.setPipelineFactory(new RemoteServerPipelineFactory(name, openChannels, loader))
println("======= ADDING actor for " + hostname + " - " + port)
println("======= " + RemoteServer.Address(hostname, port).hashCode)
println("======= " + RemoteServer.actorsFor(RemoteServer.Address(hostname, port)).actors.size)
val remoteActorSet = RemoteServer.actorsFor(RemoteServer.Address(hostname, port))
val pipelineFactory = new RemoteServerPipelineFactory(name, openChannels, loader, remoteActorSet.actors, remoteActorSet.activeObjects)
bootstrap.setPipelineFactory(pipelineFactory)
bootstrap.setOption("child.tcpNoDelay", true)
bootstrap.setOption("child.keepAlive", true)
bootstrap.setOption("child.reuseAddress", true)
@ -135,9 +176,11 @@ class RemoteServer extends Logging {
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class RemoteServerPipelineFactory(
name: String,
openChannels: ChannelGroup,
loader: Option[ClassLoader]) extends ChannelPipelineFactory {
val name: String,
val openChannels: ChannelGroup,
val loader: Option[ClassLoader],
val actors: JMap[String, Actor],
val activeObjects: JMap[String, AnyRef]) extends ChannelPipelineFactory {
import RemoteServer._
def getPipeline: ChannelPipeline = {
@ -156,7 +199,7 @@ class RemoteServerPipelineFactory(
}
pipeline.addLast("frameEncoder", new LengthFieldPrepender(4))
pipeline.addLast("protobufEncoder", new ProtobufEncoder)
pipeline.addLast("handler", new RemoteServerHandler(name, openChannels, loader))
pipeline.addLast("handler", new RemoteServerHandler(name, openChannels, loader, actors, activeObjects))
pipeline
}
}
@ -167,13 +210,12 @@ class RemoteServerPipelineFactory(
@ChannelPipelineCoverage {val value = "all"}
class RemoteServerHandler(
val name: String,
openChannels: ChannelGroup,
val applicationLoader: Option[ClassLoader]) extends SimpleChannelUpstreamHandler with Logging {
val openChannels: ChannelGroup,
val applicationLoader: Option[ClassLoader],
val actors: JMap[String, Actor],
val activeObjects: JMap[String, AnyRef]) extends SimpleChannelUpstreamHandler with Logging {
val AW_PROXY_PREFIX = "$$ProxiedByAW".intern
private val activeObjects = new ConcurrentHashMap[String, AnyRef]
private val actors = new ConcurrentHashMap[String, Actor]
applicationLoader.foreach(RemoteProtocolBuilder.setClassLoader(_))
/**