diff --git a/akka-actor/src/main/scala/actor/ActorRef.scala b/akka-actor/src/main/scala/actor/ActorRef.scala index 208f317362..eefcccced6 100644 --- a/akka-actor/src/main/scala/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/actor/ActorRef.scala @@ -600,8 +600,6 @@ trait ActorRef extends ActorRefShared with TransactionManagement with Logging wi protected[akka] def linkedActors: JMap[Uuid, ActorRef] - protected[akka] def linkedActorsAsList: List[ActorRef] - override def hashCode: Int = HashCode.hash(HashCode.SEED, uuid) override def equals(that: Any): Boolean = { @@ -640,7 +638,7 @@ class LocalActorRef private[akka] ( @volatile private[akka] var _remoteAddress: Option[InetSocketAddress] = None // only mutable to maintain identity across nodes @volatile - private[akka] var _linkedActors: Option[ConcurrentHashMap[Uuid, ActorRef]] = None + private[akka] lazy val _linkedActors = new ConcurrentHashMap[Uuid, ActorRef] @volatile private[akka] var _supervisor: Option[ActorRef] = None @volatile @@ -944,9 +942,12 @@ class LocalActorRef private[akka] ( /** * Shuts down and removes all linked actors. */ - def shutdownLinkedActors(): Unit = { - linkedActorsAsList.foreach(_.stop) - linkedActors.clear + def shutdownLinkedActors() { + val i = linkedActors.values.iterator + while(i.hasNext) { + i.next.stop + i.remove + } } /** @@ -1082,7 +1083,8 @@ class LocalActorRef private[akka] ( } protected[akka] def restartLinkedActors(reason: Throwable, maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]) = { - linkedActorsAsList.foreach { actorRef => + import scala.collection.JavaConversions._ + linkedActors.values foreach { actorRef => actorRef.lifeCycle match { // either permanent or none where default is permanent case Temporary => shutDownTemporaryActor(actorRef) @@ -1099,16 +1101,7 @@ class LocalActorRef private[akka] ( } else None } - protected[akka] def linkedActors: JMap[Uuid, ActorRef] = guard.withGuard { - if (_linkedActors.isEmpty) { - val actors = new ConcurrentHashMap[Uuid, ActorRef] - _linkedActors = Some(actors) - actors - } else _linkedActors.get - } - - protected[akka] def linkedActorsAsList: List[ActorRef] = - linkedActors.values.toArray.toList.asInstanceOf[List[ActorRef]] + protected[akka] def linkedActors: JMap[Uuid, ActorRef] = _linkedActors // ========= PRIVATE FUNCTIONS ========= @@ -1411,7 +1404,6 @@ private[akka] case class RemoteActorRef private[akka] ( protected[akka] def restart(reason: Throwable, maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]): Unit = unsupported protected[akka] def restartLinkedActors(reason: Throwable, maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]): Unit = unsupported protected[akka] def linkedActors: JMap[Uuid, ActorRef] = unsupported - protected[akka] def linkedActorsAsList: List[ActorRef] = unsupported protected[akka] def invoke(messageHandle: MessageInvocation): Unit = unsupported protected[akka] def remoteAddress_=(addr: Option[InetSocketAddress]): Unit = unsupported protected[akka] def supervisor_=(sup: Option[ActorRef]): Unit = unsupported diff --git a/akka-actor/src/main/scala/actor/ActorRegistry.scala b/akka-actor/src/main/scala/actor/ActorRegistry.scala index 40f480c86e..41bff91132 100644 --- a/akka-actor/src/main/scala/actor/ActorRegistry.scala +++ b/akka-actor/src/main/scala/actor/ActorRegistry.scala @@ -187,7 +187,7 @@ object ActorRegistry extends ListenerManagement { def typedActorFor[T <: AnyRef](implicit manifest: Manifest[T]): Option[AnyRef] = { TypedActorModule.ensureTypedActorEnabled def predicate(proxy: AnyRef) : Boolean = { - val actorRef = actorFor(proxy) + val actorRef = TypedActorModule.typedActorObjectInstance.get.actorFor(proxy) actorRef.isDefined && manifest.erasure.isAssignableFrom(actorRef.get.actor.getClass) } findTypedActor({ case a:AnyRef if predicate(a) => a }) @@ -199,7 +199,7 @@ object ActorRegistry extends ListenerManagement { def typedActorsFor[T <: AnyRef](clazz: Class[T]): Array[AnyRef] = { TypedActorModule.ensureTypedActorEnabled def predicate(proxy: AnyRef) : Boolean = { - val actorRef = actorFor(proxy) + val actorRef = TypedActorModule.typedActorObjectInstance.get.actorFor(proxy) actorRef.isDefined && clazz.isAssignableFrom(actorRef.get.actor.getClass) } filterTypedActors(predicate) @@ -233,13 +233,6 @@ object ActorRegistry extends ListenerManagement { TypedActorModule.typedActorObjectInstance.get.proxyFor(actorRef) } - /** - * Get the underlying typed actor for a given proxy. - */ - private def actorFor(proxy: AnyRef): Option[ActorRef] = { - TypedActorModule.typedActorObjectInstance.get.actorFor(proxy) - } - /** * Registers an actor in the ActorRegistry. diff --git a/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/FaultTolerantConnectionActor.scala b/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/FaultTolerantConnectionActor.scala index 2bb821d5eb..77df7f887e 100644 --- a/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/FaultTolerantConnectionActor.scala +++ b/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/FaultTolerantConnectionActor.scala @@ -8,9 +8,9 @@ import java.util.{TimerTask, Timer} import java.io.IOException import com.rabbitmq.client._ import se.scalablesolutions.akka.amqp.AMQP.ConnectionParameters -import se.scalablesolutions.akka.actor.{Exit, Actor} import se.scalablesolutions.akka.config.ScalaConfig.{Permanent, LifeCycle} import se.scalablesolutions.akka.config.OneForOneStrategy +import se.scalablesolutions.akka.actor.{ActorRef, Exit, Actor} private[amqp] class FaultTolerantConnectionActor(connectionParameters: ConnectionParameters) extends Actor { import connectionParameters._ @@ -68,8 +68,9 @@ private[amqp] class FaultTolerantConnectionActor(connectionParameters: Connectio } }) log.info("Successfully (re)connected to AMQP Server %s:%s [%s]", host, port, self.id) - log.debug("Sending new channel to %d already linked actors", self.linkedActorsAsList.size) - self.linkedActorsAsList.foreach(_ ! conn.createChannel) + log.debug("Sending new channel to %d already linked actors", self.linkedActors.size) + import scala.collection.JavaConversions._ + self.linkedActors.values.iterator.foreach(_ ! conn.createChannel) notifyCallback(Connected) } } catch { diff --git a/akka-remote/src/main/scala/remote/RemoteServer.scala b/akka-remote/src/main/scala/remote/RemoteServer.scala index 40b312f879..deb3c05b87 100644 --- a/akka-remote/src/main/scala/remote/RemoteServer.scala +++ b/akka-remote/src/main/scala/remote/RemoteServer.scala @@ -66,7 +66,8 @@ object RemoteNode extends RemoteServer * * @author Jonas Bonér */ -object RemoteServer { +object +RemoteServer { val UUID_PREFIX = "uuid:" val HOSTNAME = config.getString("akka.remote.server.hostname", "localhost") val PORT = config.getInt("akka.remote.server.port", 9999)