diff --git a/akka-actor/src/main/scala/actor/ActorRegistry.scala b/akka-actor/src/main/scala/actor/ActorRegistry.scala index 4099d64160..f34d9fc2c0 100644 --- a/akka-actor/src/main/scala/actor/ActorRegistry.scala +++ b/akka-actor/src/main/scala/actor/ActorRegistry.scala @@ -4,15 +4,16 @@ package se.scalablesolutions.akka.actor -import scala.collection.mutable.ListBuffer +import scala.collection.mutable.{ListBuffer, Map} import scala.reflect.Manifest import java.util.concurrent.{ConcurrentSkipListSet, ConcurrentHashMap} import java.util.{Set => JSet} import annotation.tailrec -import se.scalablesolutions.akka.util.ListenerManagement import se.scalablesolutions.akka.util.ReflectiveAccess._ +import se.scalablesolutions.akka.util.{ReadWriteGuard, Address, ListenerManagement} +import java.net.InetSocketAddress /** * Base trait for ActorRegistry events, allows listen to when an actor is added and removed from the ActorRegistry. @@ -38,6 +39,8 @@ case class ActorUnregistered(actor: ActorRef) extends ActorRegistryEvent object ActorRegistry extends ListenerManagement { private val actorsByUUID = new ConcurrentHashMap[Uuid, ActorRef] private val actorsById = new Index[String,ActorRef] + private val remoteActorSets = Map[Address, RemoteActorSet]() + private val guard = new ReadWriteGuard /** * Returns all actors in the system. @@ -279,6 +282,33 @@ object ActorRegistry extends ListenerManagement { actorsById.clear log.info("All actors have been shut down and unregistered from ActorRegistry") } + + /** + * Get the remote actors for the given server address. For internal use only. + */ + private[akka] def actorsFor(remoteServerAddress: Address): RemoteActorSet = guard.withWriteGuard { + remoteActorSets.getOrElseUpdate(remoteServerAddress, new RemoteActorSet) + } + + private[akka] def registerActorByUuid(address: InetSocketAddress, uuid: String, actor: ActorRef) { + actorsByUuid(Address(address.getHostName, address.getPort)).putIfAbsent(uuid, actor) + } + + private[akka] def registerTypedActorByUuid(address: InetSocketAddress, uuid: String, typedActor: AnyRef) { + typedActorsByUuid(Address(address.getHostName, address.getPort)).putIfAbsent(uuid, typedActor) + } + + private[akka] def actors(address: Address) = actorsFor(address).actors + private[akka] def actorsByUuid(address: Address) = actorsFor(address).actorsByUuid + private[akka] def typedActors(address: Address) = actorsFor(address).typedActors + private[akka] def typedActorsByUuid(address: Address) = actorsFor(address).typedActorsByUuid + + private[akka] class RemoteActorSet { + private[ActorRegistry] val actors = new ConcurrentHashMap[String, ActorRef] + private[ActorRegistry] val actorsByUuid = new ConcurrentHashMap[String, ActorRef] + private[ActorRegistry] val typedActors = new ConcurrentHashMap[String, AnyRef] + private[ActorRegistry] val typedActorsByUuid = new ConcurrentHashMap[String, AnyRef] + } } class Index[K <: AnyRef,V <: AnyRef : Manifest] { diff --git a/akka-actor/src/main/scala/util/Address.scala b/akka-actor/src/main/scala/util/Address.scala new file mode 100644 index 0000000000..34c3f51bd4 --- /dev/null +++ b/akka-actor/src/main/scala/util/Address.scala @@ -0,0 +1,23 @@ +/** + * Copyright (C) 2009-2010 Scalable Solutions AB + */ +package se.scalablesolutions.akka.util + +object Address { + def apply(hostname: String, port: Int) = new Address(hostname, port) +} + +class Address(val hostname: String, val port: Int) { + override def hashCode: Int = { + var result = HashCode.SEED + result = HashCode.hash(result, hostname) + result = HashCode.hash(result, port) + result + } + + override def equals(that: Any): Boolean = { + that.isInstanceOf[Address] && + that.asInstanceOf[Address].hostname == hostname && + that.asInstanceOf[Address].port == port + } +} \ No newline at end of file diff --git a/akka-remote/src/main/scala/remote/RemoteClient.scala b/akka-remote/src/main/scala/remote/RemoteClient.scala index bb9714bc61..2694c9aee8 100644 --- a/akka-remote/src/main/scala/remote/RemoteClient.scala +++ b/akka-remote/src/main/scala/remote/RemoteClient.scala @@ -7,7 +7,6 @@ package se.scalablesolutions.akka.remote import se.scalablesolutions.akka.remote.protocol.RemoteProtocol.{ActorType => ActorTypeProtocol, _} import se.scalablesolutions.akka.actor.{Exit, Actor, ActorRef, ActorType, RemoteActorRef, IllegalActorStateException} import se.scalablesolutions.akka.dispatch.{DefaultCompletableFuture, CompletableFuture} -import se.scalablesolutions.akka.util.{ListenerManagement, Logging, Duration} import se.scalablesolutions.akka.actor.{Uuid,newUuid,uuidFrom} import se.scalablesolutions.akka.config.Config._ import se.scalablesolutions.akka.serialization.RemoteActorSerialization._ @@ -31,6 +30,7 @@ import java.util.concurrent.atomic.AtomicLong import scala.collection.mutable.{HashSet, HashMap} import scala.reflect.BeanProperty import se.scalablesolutions.akka.actor._ +import se.scalablesolutions.akka.util.{Address, ListenerManagement, Logging, Duration} /** * Life-cycle events for RemoteClient. @@ -63,7 +63,7 @@ object RemoteClient extends Logging { val RECONNECT_DELAY = Duration(config.getInt("akka.remote.client.reconnect-delay", 5), TIME_UNIT) private val remoteClients = new HashMap[String, RemoteClient] - private val remoteActors = new HashMap[RemoteServer.Address, HashSet[Uuid]] + private val remoteActors = new HashMap[Address, HashSet[Uuid]] def actorFor(classNameOrServiceId: String, hostname: String, port: Int): ActorRef = actorFor(classNameOrServiceId, classNameOrServiceId, 5000L, hostname, port, None) @@ -163,16 +163,16 @@ object RemoteClient extends Logging { } def register(hostname: String, port: Int, uuid: Uuid) = synchronized { - actorsFor(RemoteServer.Address(hostname, port)) += uuid + actorsFor(Address(hostname, port)) += uuid } private[akka] def unregister(hostname: String, port: Int, uuid: Uuid) = synchronized { - val set = actorsFor(RemoteServer.Address(hostname, port)) + val set = actorsFor(Address(hostname, port)) set -= uuid if (set.isEmpty) shutdownClientFor(new InetSocketAddress(hostname, port)) } - private[akka] def actorsFor(remoteServerAddress: RemoteServer.Address): HashSet[Uuid] = { + private[akka] def actorsFor(remoteServerAddress: Address): HashSet[Uuid] = { val set = remoteActors.get(remoteServerAddress) if (set.isDefined && (set.get ne null)) set.get else { diff --git a/akka-remote/src/main/scala/remote/RemoteServer.scala b/akka-remote/src/main/scala/remote/RemoteServer.scala index 186e09bc02..27b9af15a2 100644 --- a/akka-remote/src/main/scala/remote/RemoteServer.scala +++ b/akka-remote/src/main/scala/remote/RemoteServer.scala @@ -10,7 +10,7 @@ import java.util.concurrent.{ConcurrentHashMap, Executors} import java.util.{Map => JMap} import se.scalablesolutions.akka.actor.{ - Actor, TypedActor, ActorRef, IllegalActorStateException, RemoteActorSystemMessage, uuidFrom, Uuid} + Actor, TypedActor, ActorRef, IllegalActorStateException, RemoteActorSystemMessage, uuidFrom, Uuid, ActorRegistry} import se.scalablesolutions.akka.actor.Actor._ import se.scalablesolutions.akka.util._ import se.scalablesolutions.akka.remote.protocol.RemoteProtocol._ @@ -103,43 +103,9 @@ object RemoteServer { } else */false } - object Address { - def apply(hostname: String, port: Int) = new Address(hostname, port) - } - - class Address(val hostname: String, val port: Int) { - override def hashCode: Int = { - var result = HashCode.SEED - result = HashCode.hash(result, hostname) - result = HashCode.hash(result, port) - result - } - override def equals(that: Any): Boolean = { - that.isInstanceOf[Address] && - that.asInstanceOf[Address].hostname == hostname && - that.asInstanceOf[Address].port == port - } - } - - private class RemoteActorSet { - private[RemoteServer] val actors = new ConcurrentHashMap[String, ActorRef] - private[RemoteServer] val actorsByUuid = new ConcurrentHashMap[String, ActorRef] - private[RemoteServer] val typedActors = new ConcurrentHashMap[String, AnyRef] - private[RemoteServer] val typedActorsByUuid = new ConcurrentHashMap[String, AnyRef] - } - private val guard = new ReadWriteGuard - private val remoteActorSets = Map[Address, RemoteActorSet]() private val remoteServers = Map[Address, RemoteServer]() - private[akka] def registerActorByUuid(address: InetSocketAddress, uuid: String, actor: ActorRef) = guard.withWriteGuard { - actorsFor(RemoteServer.Address(address.getHostName, address.getPort)).actorsByUuid.put(uuid, actor) - } - - private[akka] def registerTypedActorByUuid(address: InetSocketAddress, uuid: String, typedActor: AnyRef) = guard.withWriteGuard { - actorsFor(RemoteServer.Address(address.getHostName, address.getPort)).typedActors.put(uuid, typedActor) - } - private[akka] def getOrCreateServer(address: InetSocketAddress): RemoteServer = guard.withWriteGuard { serverFor(address) match { case Some(server) => server @@ -161,10 +127,7 @@ object RemoteServer { private[akka] def unregister(hostname: String, port: Int) = guard.withWriteGuard { remoteServers.remove(Address(hostname, port)) } - - private def actorsFor(remoteServerAddress: RemoteServer.Address): RemoteActorSet = { - remoteActorSets.getOrElseUpdate(remoteServerAddress,new RemoteActorSet) - } + } /** @@ -197,7 +160,7 @@ class RemoteServer extends Logging with ListenerManagement { import RemoteServer._ def name = "RemoteServer@" + hostname + ":" + port - private[akka] var address = RemoteServer.Address(RemoteServer.HOSTNAME,RemoteServer.PORT) + private[akka] var address = Address(RemoteServer.HOSTNAME,RemoteServer.PORT) def hostname = address.hostname def port = address.port @@ -236,7 +199,7 @@ class RemoteServer extends Logging with ListenerManagement { private def start(_hostname: String, _port: Int, loader: Option[ClassLoader]): RemoteServer = synchronized { try { if (!_isRunning) { - address = RemoteServer.Address(_hostname,_port) + address = Address(_hostname,_port) log.info("Starting remote server at [%s:%s]", hostname, port) RemoteServer.register(hostname, port, this) val pipelineFactory = new RemoteServerPipelineFactory( @@ -379,10 +342,10 @@ class RemoteServer extends Logging with ListenerManagement { protected[akka] override def notifyListeners(message: => Any): Unit = super.notifyListeners(message) - private[akka] def actors() = RemoteServer.actorsFor(address).actors - private[akka] def actorsByUuid() = RemoteServer.actorsFor(address).actorsByUuid - private[akka] def typedActors() = RemoteServer.actorsFor(address).typedActors - private[akka] def typedActorsByUuid() = RemoteServer.actorsFor(address).typedActorsByUuid + private[akka] def actors() = ActorRegistry.actors(address) + private[akka] def actorsByUuid() = ActorRegistry.actorsByUuid(address) + private[akka] def typedActors() = ActorRegistry.typedActors(address) + private[akka] def typedActorsByUuid() = ActorRegistry.typedActorsByUuid(address) } object RemoteServerSslContext { diff --git a/akka-remote/src/main/scala/serialization/SerializationProtocol.scala b/akka-remote/src/main/scala/serialization/SerializationProtocol.scala index 5ec8bb6344..c9dd582978 100644 --- a/akka-remote/src/main/scala/serialization/SerializationProtocol.scala +++ b/akka-remote/src/main/scala/serialization/SerializationProtocol.scala @@ -250,7 +250,7 @@ object RemoteActorSerialization { Actor.log.debug("Register serialized Actor [%s] as remote @ [%s:%s]", actorClass.getName, host, port) RemoteServer.getOrCreateServer(homeAddress) - RemoteServer.registerActorByUuid(homeAddress, uuid.toString, ar) + ActorRegistry.registerActorByUuid(homeAddress, uuid.toString, ar) RemoteActorRefProtocol.newBuilder .setClassOrServiceName(uuid.toString)