Merge branch 'master' of git@github.com:jboner/akka

Conflicts:
	akka-remote/src/main/scala/remote/RemoteServer.scala
This commit is contained in:
Jonas Bonér 2010-09-30 16:10:39 +02:00
commit bf33856abd
5 changed files with 69 additions and 53 deletions

View file

@ -4,15 +4,16 @@
package se.scalablesolutions.akka.actor
import scala.collection.mutable.ListBuffer
import scala.collection.mutable.{ListBuffer, Map}
import scala.reflect.Manifest
import java.util.concurrent.{ConcurrentSkipListSet, ConcurrentHashMap}
import java.util.{Set => JSet}
import annotation.tailrec
import se.scalablesolutions.akka.util.ListenerManagement
import se.scalablesolutions.akka.util.ReflectiveAccess._
import se.scalablesolutions.akka.util.{ReadWriteGuard, Address, ListenerManagement}
import java.net.InetSocketAddress
/**
* Base trait for ActorRegistry events, allows listen to when an actor is added and removed from the ActorRegistry.
@ -38,6 +39,8 @@ case class ActorUnregistered(actor: ActorRef) extends ActorRegistryEvent
object ActorRegistry extends ListenerManagement {
private val actorsByUUID = new ConcurrentHashMap[Uuid, ActorRef]
private val actorsById = new Index[String,ActorRef]
private val remoteActorSets = Map[Address, RemoteActorSet]()
private val guard = new ReadWriteGuard
/**
* Returns all actors in the system.
@ -279,6 +282,33 @@ object ActorRegistry extends ListenerManagement {
actorsById.clear
log.info("All actors have been shut down and unregistered from ActorRegistry")
}
/**
* Get the remote actors for the given server address. For internal use only.
*/
private[akka] def actorsFor(remoteServerAddress: Address): RemoteActorSet = guard.withWriteGuard {
remoteActorSets.getOrElseUpdate(remoteServerAddress, new RemoteActorSet)
}
private[akka] def registerActorByUuid(address: InetSocketAddress, uuid: String, actor: ActorRef) {
actorsByUuid(Address(address.getHostName, address.getPort)).putIfAbsent(uuid, actor)
}
private[akka] def registerTypedActorByUuid(address: InetSocketAddress, uuid: String, typedActor: AnyRef) {
typedActorsByUuid(Address(address.getHostName, address.getPort)).putIfAbsent(uuid, typedActor)
}
private[akka] def actors(address: Address) = actorsFor(address).actors
private[akka] def actorsByUuid(address: Address) = actorsFor(address).actorsByUuid
private[akka] def typedActors(address: Address) = actorsFor(address).typedActors
private[akka] def typedActorsByUuid(address: Address) = actorsFor(address).typedActorsByUuid
private[akka] class RemoteActorSet {
private[ActorRegistry] val actors = new ConcurrentHashMap[String, ActorRef]
private[ActorRegistry] val actorsByUuid = new ConcurrentHashMap[String, ActorRef]
private[ActorRegistry] val typedActors = new ConcurrentHashMap[String, AnyRef]
private[ActorRegistry] val typedActorsByUuid = new ConcurrentHashMap[String, AnyRef]
}
}
class Index[K <: AnyRef,V <: AnyRef : Manifest] {

View file

@ -0,0 +1,23 @@
/**
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.util
object Address {
def apply(hostname: String, port: Int) = new Address(hostname, port)
}
class Address(val hostname: String, val port: Int) {
override def hashCode: Int = {
var result = HashCode.SEED
result = HashCode.hash(result, hostname)
result = HashCode.hash(result, port)
result
}
override def equals(that: Any): Boolean = {
that.isInstanceOf[Address] &&
that.asInstanceOf[Address].hostname == hostname &&
that.asInstanceOf[Address].port == port
}
}

View file

@ -7,7 +7,6 @@ package se.scalablesolutions.akka.remote
import se.scalablesolutions.akka.remote.protocol.RemoteProtocol.{ActorType => ActorTypeProtocol, _}
import se.scalablesolutions.akka.actor.{Exit, Actor, ActorRef, ActorType, RemoteActorRef, IllegalActorStateException}
import se.scalablesolutions.akka.dispatch.{DefaultCompletableFuture, CompletableFuture}
import se.scalablesolutions.akka.util.{ListenerManagement, Logging, Duration}
import se.scalablesolutions.akka.actor.{Uuid,newUuid,uuidFrom}
import se.scalablesolutions.akka.config.Config._
import se.scalablesolutions.akka.serialization.RemoteActorSerialization._
@ -31,6 +30,7 @@ import java.util.concurrent.atomic.AtomicLong
import scala.collection.mutable.{HashSet, HashMap}
import scala.reflect.BeanProperty
import se.scalablesolutions.akka.actor._
import se.scalablesolutions.akka.util.{Address, ListenerManagement, Logging, Duration}
/**
* Life-cycle events for RemoteClient.
@ -63,7 +63,7 @@ object RemoteClient extends Logging {
val RECONNECT_DELAY = Duration(config.getInt("akka.remote.client.reconnect-delay", 5), TIME_UNIT)
private val remoteClients = new HashMap[String, RemoteClient]
private val remoteActors = new HashMap[RemoteServer.Address, HashSet[Uuid]]
private val remoteActors = new HashMap[Address, HashSet[Uuid]]
def actorFor(classNameOrServiceId: String, hostname: String, port: Int): ActorRef =
actorFor(classNameOrServiceId, classNameOrServiceId, 5000L, hostname, port, None)
@ -163,16 +163,16 @@ object RemoteClient extends Logging {
}
def register(hostname: String, port: Int, uuid: Uuid) = synchronized {
actorsFor(RemoteServer.Address(hostname, port)) += uuid
actorsFor(Address(hostname, port)) += uuid
}
private[akka] def unregister(hostname: String, port: Int, uuid: Uuid) = synchronized {
val set = actorsFor(RemoteServer.Address(hostname, port))
val set = actorsFor(Address(hostname, port))
set -= uuid
if (set.isEmpty) shutdownClientFor(new InetSocketAddress(hostname, port))
}
private[akka] def actorsFor(remoteServerAddress: RemoteServer.Address): HashSet[Uuid] = {
private[akka] def actorsFor(remoteServerAddress: Address): HashSet[Uuid] = {
val set = remoteActors.get(remoteServerAddress)
if (set.isDefined && (set.get ne null)) set.get
else {

View file

@ -10,7 +10,7 @@ import java.util.concurrent.{ConcurrentHashMap, Executors}
import java.util.{Map => JMap}
import se.scalablesolutions.akka.actor.{
Actor, TypedActor, ActorRef, IllegalActorStateException, RemoteActorSystemMessage, uuidFrom, Uuid}
Actor, TypedActor, ActorRef, IllegalActorStateException, RemoteActorSystemMessage, uuidFrom, Uuid, ActorRegistry}
import se.scalablesolutions.akka.actor.Actor._
import se.scalablesolutions.akka.util._
import se.scalablesolutions.akka.remote.protocol.RemoteProtocol._
@ -103,43 +103,9 @@ object RemoteServer {
} else */false
}
object Address {
def apply(hostname: String, port: Int) = new Address(hostname, port)
}
class Address(val hostname: String, val port: Int) {
override def hashCode: Int = {
var result = HashCode.SEED
result = HashCode.hash(result, hostname)
result = HashCode.hash(result, port)
result
}
override def equals(that: Any): Boolean = {
that.isInstanceOf[Address] &&
that.asInstanceOf[Address].hostname == hostname &&
that.asInstanceOf[Address].port == port
}
}
private class RemoteActorSet {
private[RemoteServer] val actors = new ConcurrentHashMap[String, ActorRef]
private[RemoteServer] val actorsByUuid = new ConcurrentHashMap[String, ActorRef]
private[RemoteServer] val typedActors = new ConcurrentHashMap[String, AnyRef]
private[RemoteServer] val typedActorsByUuid = new ConcurrentHashMap[String, AnyRef]
}
private val guard = new ReadWriteGuard
private val remoteActorSets = Map[Address, RemoteActorSet]()
private val remoteServers = Map[Address, RemoteServer]()
private[akka] def registerActorByUuid(address: InetSocketAddress, uuid: String, actor: ActorRef) = guard.withWriteGuard {
actorsFor(RemoteServer.Address(address.getHostName, address.getPort)).actorsByUuid.put(uuid, actor)
}
private[akka] def registerTypedActorByUuid(address: InetSocketAddress, uuid: String, typedActor: AnyRef) = guard.withWriteGuard {
actorsFor(RemoteServer.Address(address.getHostName, address.getPort)).typedActors.put(uuid, typedActor)
}
private[akka] def getOrCreateServer(address: InetSocketAddress): RemoteServer = guard.withWriteGuard {
serverFor(address) match {
case Some(server) => server
@ -161,10 +127,7 @@ object RemoteServer {
private[akka] def unregister(hostname: String, port: Int) = guard.withWriteGuard {
remoteServers.remove(Address(hostname, port))
}
private def actorsFor(remoteServerAddress: RemoteServer.Address): RemoteActorSet = {
remoteActorSets.getOrElseUpdate(remoteServerAddress,new RemoteActorSet)
}
}
/**
@ -197,7 +160,7 @@ class RemoteServer extends Logging with ListenerManagement {
import RemoteServer._
def name = "RemoteServer@" + hostname + ":" + port
private[akka] var address = RemoteServer.Address(RemoteServer.HOSTNAME,RemoteServer.PORT)
private[akka] var address = Address(RemoteServer.HOSTNAME,RemoteServer.PORT)
def hostname = address.hostname
def port = address.port
@ -236,7 +199,7 @@ class RemoteServer extends Logging with ListenerManagement {
private def start(_hostname: String, _port: Int, loader: Option[ClassLoader]): RemoteServer = synchronized {
try {
if (!_isRunning) {
address = RemoteServer.Address(_hostname,_port)
address = Address(_hostname,_port)
log.info("Starting remote server at [%s:%s]", hostname, port)
RemoteServer.register(hostname, port, this)
val pipelineFactory = new RemoteServerPipelineFactory(
@ -379,10 +342,10 @@ class RemoteServer extends Logging with ListenerManagement {
protected[akka] override def notifyListeners(message: => Any): Unit = super.notifyListeners(message)
private[akka] def actors() = RemoteServer.actorsFor(address).actors
private[akka] def actorsByUuid() = RemoteServer.actorsFor(address).actorsByUuid
private[akka] def typedActors() = RemoteServer.actorsFor(address).typedActors
private[akka] def typedActorsByUuid() = RemoteServer.actorsFor(address).typedActorsByUuid
private[akka] def actors() = ActorRegistry.actors(address)
private[akka] def actorsByUuid() = ActorRegistry.actorsByUuid(address)
private[akka] def typedActors() = ActorRegistry.typedActors(address)
private[akka] def typedActorsByUuid() = ActorRegistry.typedActorsByUuid(address)
}
object RemoteServerSslContext {

View file

@ -250,7 +250,7 @@ object RemoteActorSerialization {
Actor.log.debug("Register serialized Actor [%s] as remote @ [%s:%s]", actorClass.getName, host, port)
RemoteServer.getOrCreateServer(homeAddress)
RemoteServer.registerActorByUuid(homeAddress, uuid.toString, ar)
ActorRegistry.registerActorByUuid(homeAddress, uuid.toString, ar)
RemoteActorRefProtocol.newBuilder
.setClassOrServiceName(uuid.toString)