Removing linkedActorsAsList, switching _linkedActors to be a volatile lazy val instead of Option with lazy semantics

This commit is contained in:
Viktor Klang 2010-10-08 15:43:00 +02:00
parent c045dd1325
commit 90831a9767
4 changed files with 18 additions and 31 deletions

View file

@ -600,8 +600,6 @@ trait ActorRef extends ActorRefShared with TransactionManagement with Logging wi
protected[akka] def linkedActors: JMap[Uuid, ActorRef]
protected[akka] def linkedActorsAsList: List[ActorRef]
override def hashCode: Int = HashCode.hash(HashCode.SEED, uuid)
override def equals(that: Any): Boolean = {
@ -640,7 +638,7 @@ class LocalActorRef private[akka] (
@volatile
private[akka] var _remoteAddress: Option[InetSocketAddress] = None // only mutable to maintain identity across nodes
@volatile
private[akka] var _linkedActors: Option[ConcurrentHashMap[Uuid, ActorRef]] = None
private[akka] lazy val _linkedActors = new ConcurrentHashMap[Uuid, ActorRef]
@volatile
private[akka] var _supervisor: Option[ActorRef] = None
@volatile
@ -944,9 +942,12 @@ class LocalActorRef private[akka] (
/**
* Shuts down and removes all linked actors.
*/
def shutdownLinkedActors(): Unit = {
linkedActorsAsList.foreach(_.stop)
linkedActors.clear
def shutdownLinkedActors() {
val i = linkedActors.values.iterator
while(i.hasNext) {
i.next.stop
i.remove
}
}
/**
@ -1082,7 +1083,8 @@ class LocalActorRef private[akka] (
}
protected[akka] def restartLinkedActors(reason: Throwable, maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]) = {
linkedActorsAsList.foreach { actorRef =>
import scala.collection.JavaConversions._
linkedActors.values foreach { actorRef =>
actorRef.lifeCycle match {
// either permanent or none where default is permanent
case Temporary => shutDownTemporaryActor(actorRef)
@ -1099,16 +1101,7 @@ class LocalActorRef private[akka] (
} else None
}
protected[akka] def linkedActors: JMap[Uuid, ActorRef] = guard.withGuard {
if (_linkedActors.isEmpty) {
val actors = new ConcurrentHashMap[Uuid, ActorRef]
_linkedActors = Some(actors)
actors
} else _linkedActors.get
}
protected[akka] def linkedActorsAsList: List[ActorRef] =
linkedActors.values.toArray.toList.asInstanceOf[List[ActorRef]]
protected[akka] def linkedActors: JMap[Uuid, ActorRef] = _linkedActors
// ========= PRIVATE FUNCTIONS =========
@ -1411,7 +1404,6 @@ private[akka] case class RemoteActorRef private[akka] (
protected[akka] def restart(reason: Throwable, maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]): Unit = unsupported
protected[akka] def restartLinkedActors(reason: Throwable, maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]): Unit = unsupported
protected[akka] def linkedActors: JMap[Uuid, ActorRef] = unsupported
protected[akka] def linkedActorsAsList: List[ActorRef] = unsupported
protected[akka] def invoke(messageHandle: MessageInvocation): Unit = unsupported
protected[akka] def remoteAddress_=(addr: Option[InetSocketAddress]): Unit = unsupported
protected[akka] def supervisor_=(sup: Option[ActorRef]): Unit = unsupported

View file

@ -187,7 +187,7 @@ object ActorRegistry extends ListenerManagement {
def typedActorFor[T <: AnyRef](implicit manifest: Manifest[T]): Option[AnyRef] = {
TypedActorModule.ensureTypedActorEnabled
def predicate(proxy: AnyRef) : Boolean = {
val actorRef = actorFor(proxy)
val actorRef = TypedActorModule.typedActorObjectInstance.get.actorFor(proxy)
actorRef.isDefined && manifest.erasure.isAssignableFrom(actorRef.get.actor.getClass)
}
findTypedActor({ case a:AnyRef if predicate(a) => a })
@ -199,7 +199,7 @@ object ActorRegistry extends ListenerManagement {
def typedActorsFor[T <: AnyRef](clazz: Class[T]): Array[AnyRef] = {
TypedActorModule.ensureTypedActorEnabled
def predicate(proxy: AnyRef) : Boolean = {
val actorRef = actorFor(proxy)
val actorRef = TypedActorModule.typedActorObjectInstance.get.actorFor(proxy)
actorRef.isDefined && clazz.isAssignableFrom(actorRef.get.actor.getClass)
}
filterTypedActors(predicate)
@ -233,13 +233,6 @@ object ActorRegistry extends ListenerManagement {
TypedActorModule.typedActorObjectInstance.get.proxyFor(actorRef)
}
/**
* Get the underlying typed actor for a given proxy.
*/
private def actorFor(proxy: AnyRef): Option[ActorRef] = {
TypedActorModule.typedActorObjectInstance.get.actorFor(proxy)
}
/**
* Registers an actor in the ActorRegistry.

View file

@ -8,9 +8,9 @@ import java.util.{TimerTask, Timer}
import java.io.IOException
import com.rabbitmq.client._
import se.scalablesolutions.akka.amqp.AMQP.ConnectionParameters
import se.scalablesolutions.akka.actor.{Exit, Actor}
import se.scalablesolutions.akka.config.ScalaConfig.{Permanent, LifeCycle}
import se.scalablesolutions.akka.config.OneForOneStrategy
import se.scalablesolutions.akka.actor.{ActorRef, Exit, Actor}
private[amqp] class FaultTolerantConnectionActor(connectionParameters: ConnectionParameters) extends Actor {
import connectionParameters._
@ -68,8 +68,9 @@ private[amqp] class FaultTolerantConnectionActor(connectionParameters: Connectio
}
})
log.info("Successfully (re)connected to AMQP Server %s:%s [%s]", host, port, self.id)
log.debug("Sending new channel to %d already linked actors", self.linkedActorsAsList.size)
self.linkedActorsAsList.foreach(_ ! conn.createChannel)
log.debug("Sending new channel to %d already linked actors", self.linkedActors.size)
import scala.collection.JavaConversions._
self.linkedActors.values.iterator.foreach(_ ! conn.createChannel)
notifyCallback(Connected)
}
} catch {

View file

@ -66,7 +66,8 @@ object RemoteNode extends RemoteServer
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
object RemoteServer {
object
RemoteServer {
val UUID_PREFIX = "uuid:"
val HOSTNAME = config.getString("akka.remote.server.hostname", "localhost")
val PORT = config.getInt("akka.remote.server.port", 9999)