From f9750d8dc6bcee22ea0a31c6e9f2f674f03ddb82 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Tue, 3 Aug 2010 11:25:09 +0200 Subject: [PATCH] Closing ticket 352 --- .../src/main/scala/actor/ActorRegistry.scala | 41 +++++++++---------- .../test/scala/misc/ActorRegistrySpec.scala | 11 +++++ .../src/main/scala/akka/SimpleService.scala | 4 +- .../src/main/scala/SimpleService.scala | 8 ++-- .../src/main/scala/SimpleService.scala | 4 +- 5 files changed, 39 insertions(+), 29 deletions(-) diff --git a/akka-core/src/main/scala/actor/ActorRegistry.scala b/akka-core/src/main/scala/actor/ActorRegistry.scala index aea37432b7..953540fc70 100644 --- a/akka-core/src/main/scala/actor/ActorRegistry.scala +++ b/akka-core/src/main/scala/actor/ActorRegistry.scala @@ -31,7 +31,6 @@ case class ActorUnregistered(actor: ActorRef) extends ActorRegistryEvent object ActorRegistry extends ListenerManagement { private val actorsByUUID = new ConcurrentHashMap[String, ActorRef] private val actorsById = new ConcurrentHashMap[String, JSet[ActorRef]] - private val actorsByClassName = new ConcurrentHashMap[String, JSet[ActorRef]] /** * Returns all actors in the system. @@ -46,6 +45,21 @@ object ActorRegistry extends ListenerManagement { while (elements.hasMoreElements) f(elements.nextElement) } + /** + * Invokes the function on all known actors until it returns Some + * Returns None if the function never returns Some + */ + def find[T](f: (ActorRef) => Option[T]) : Option[T] = { + val elements = actorsByUUID.elements + while (elements.hasMoreElements) { + val result = f(elements.nextElement) + + if(result.isDefined) + return result + } + None + } + /** * Finds all actors that are subtypes of the class passed in as the Manifest argument and supproting passed message. */ @@ -71,22 +85,20 @@ object ActorRegistry extends ListenerManagement { * Finds all actors that are subtypes of the class passed in as the Manifest argument. */ def actorsFor[T <: Actor](implicit manifest: Manifest[T]): List[ActorRef] = - filter(a => manifest.erasure.isAssignableFrom(a.actor.getClass)) + actorsFor[T](manifest.erasure.asInstanceOf[Class[T]]) /** * Finds any actor that matches T. + * FIXME: Improve performance by breaking out after the first match */ def actorFor[T <: Actor](implicit manifest: Manifest[T]): Option[ActorRef] = actorsFor[T](manifest).headOption /** - * Finds all actors of the exact type specified by the class passed in as the Class argument. + * Finds all actors of type or sub-type specified by the class passed in as the Class argument. */ - def actorsFor[T <: Actor](clazz: Class[T]): List[ActorRef] = { - if (actorsByClassName.containsKey(clazz.getName)) { - actorsByClassName.get(clazz.getName).toArray.toList.asInstanceOf[List[ActorRef]] - } else Nil - } + def actorsFor[T <: Actor](clazz: Class[T]): List[ActorRef] = + filter(a => clazz.isAssignableFrom(a.actor.getClass)) /** * Finds all actors that has a specific id. @@ -122,15 +134,6 @@ object ActorRegistry extends ListenerManagement { actorsById.put(id, set) } - // Class name - val className = actor.actorClassName - if (actorsByClassName.containsKey(className)) actorsByClassName.get(className).add(actor) - else { - val set = new ConcurrentSkipListSet[ActorRef] - set.add(actor) - actorsByClassName.put(className, set) - } - // notify listeners foreachListener(_ ! ActorRegistered(actor)) } @@ -144,9 +147,6 @@ object ActorRegistry extends ListenerManagement { val id = actor.id if (actorsById.containsKey(id)) actorsById.get(id).remove(actor) - val className = actor.actorClassName - if (actorsByClassName.containsKey(className)) actorsByClassName.get(className).remove(actor) - // notify listeners foreachListener(_ ! ActorUnregistered(actor)) } @@ -159,7 +159,6 @@ object ActorRegistry extends ListenerManagement { foreach(_.stop) actorsByUUID.clear actorsById.clear - actorsByClassName.clear log.info("All actors have been shut down and unregistered from ActorRegistry") } } diff --git a/akka-core/src/test/scala/misc/ActorRegistrySpec.scala b/akka-core/src/test/scala/misc/ActorRegistrySpec.scala index 6914472e2c..d6fc463a65 100644 --- a/akka-core/src/test/scala/misc/ActorRegistrySpec.scala +++ b/akka-core/src/test/scala/misc/ActorRegistrySpec.scala @@ -76,6 +76,17 @@ class ActorRegistrySpec extends JUnitSuite { actor.stop } + @Test def shouldFindThingsFromActorRegistry { + ActorRegistry.shutdownAll + val actor = actorOf[TestActor] + actor.start + val found = ActorRegistry.find(a => if(a.actor.isInstanceOf[TestActor]) Some(a) else None) + assert(found.isDefined) + assert(found.get.actor.isInstanceOf[TestActor]) + assert(found.get.id === "MyID") + actor.stop + } + @Test def shouldGetActorsByIdFromActorRegistry { ActorRegistry.shutdownAll val actor1 = actorOf[TestActor] diff --git a/akka-samples/akka-sample-lift/src/main/scala/akka/SimpleService.scala b/akka-samples/akka-sample-lift/src/main/scala/akka/SimpleService.scala index b361fbb16b..d5358a7d89 100644 --- a/akka-samples/akka-sample-lift/src/main/scala/akka/SimpleService.scala +++ b/akka-samples/akka-sample-lift/src/main/scala/akka/SimpleService.scala @@ -64,7 +64,7 @@ object SimpleRestService extends RestHelper { case Get("liftcount" :: _, req) => //Fetch the first actor of type SimpleServiceActor //Send it the "Tick" message and expect a Node back - val result = for( a <- ActorRegistry.actorsFor(classOf[SimpleServiceActor]).headOption; + val result = for( a <- ActorRegistry.actorFor[SimpleServiceActor]; r <- (a !! "Tick").as[Node] ) yield r //Return either the resulting NodeSeq or a default one @@ -85,7 +85,7 @@ object SimpleRestService extends RestHelper { case Get("persistentliftcount" :: _, req) => //Fetch the first actor of type SimpleServiceActor //Send it the "Tick" message and expect a Node back - val result = for( a <- ActorRegistry.actorsFor(classOf[PersistentServiceActor]).headOption; + val result = for( a <- ActorRegistry.actorFor[PersistentServiceActor]; r <- (a !! "Tick").as[Node] ) yield r //Return either the resulting NodeSeq or a default one diff --git a/akka-samples/akka-sample-rest-scala/src/main/scala/SimpleService.scala b/akka-samples/akka-sample-rest-scala/src/main/scala/SimpleService.scala index fc96bba182..c3b71a3fdf 100644 --- a/akka-samples/akka-sample-rest-scala/src/main/scala/SimpleService.scala +++ b/akka-samples/akka-sample-rest-scala/src/main/scala/SimpleService.scala @@ -16,7 +16,7 @@ import java.lang.Integer import java.nio.ByteBuffer import javax.ws.rs.core.MultivaluedMap import javax.ws.rs.{GET, POST, Path, Produces, WebApplicationException, Consumes,PathParam} -import se.scalablesolutions.akka.actor.ActorRegistry.actorsFor +import se.scalablesolutions.akka.actor.ActorRegistry.actorFor import org.atmosphere.annotation.{Broadcast, Suspend,Cluster} import org.atmosphere.util.XSSHtmlFilter import org.atmosphere.cpr.{Broadcaster, BroadcastFilter} @@ -53,7 +53,7 @@ class SimpleService { def count = { //Fetch the first actor of type SimpleServiceActor //Send it the "Tick" message and expect a NodeSeq back - val result = for{a <- actorsFor(classOf[SimpleServiceActor]).headOption + val result = for{a <- actorFor[SimpleServiceActor] r <- (a !! "Tick").as[NodeSeq]} yield r //Return either the resulting NodeSeq or a default one result getOrElse Error in counter @@ -108,7 +108,7 @@ class PersistentSimpleService { def count = { //Fetch the first actor of type PersistentSimpleServiceActor //Send it the "Tick" message and expect a NodeSeq back - val result = for{a <- actorsFor(classOf[PersistentSimpleServiceActor]).headOption + val result = for{a <- actorFor[PersistentSimpleServiceActor] r <- (a !! "Tick").as[NodeSeq]} yield r //Return either the resulting NodeSeq or a default one result getOrElse Error in counter @@ -155,7 +155,7 @@ class Chat { val msg = ChatMsg(form.getFirst("name"),form.getFirst("action"),form.getFirst("message")) //Fetch the first actor of type ChatActor //Send it the "Tick" message and expect a NodeSeq back - val result = for{a <- actorsFor(classOf[ChatActor]).headOption + val result = for{a <- actorFor[ChatActor] r <- (a !! msg).as[String]} yield r //Return either the resulting String or a default one result getOrElse "System__error" diff --git a/akka-samples/akka-sample-security/src/main/scala/SimpleService.scala b/akka-samples/akka-sample-security/src/main/scala/SimpleService.scala index e5c8029eb8..02af6174c6 100644 --- a/akka-samples/akka-sample-security/src/main/scala/SimpleService.scala +++ b/akka-samples/akka-sample-security/src/main/scala/SimpleService.scala @@ -10,7 +10,7 @@ import se.scalablesolutions.akka.config.ScalaConfig._ import se.scalablesolutions.akka.util.Logging import se.scalablesolutions.akka.security.{BasicAuthenticationActor,BasicCredentials,SpnegoAuthenticationActor,DigestAuthenticationActor, UserInfo} import se.scalablesolutions.akka.stm.TransactionalMap -import se.scalablesolutions.akka.actor.ActorRegistry.actorsFor +import se.scalablesolutions.akka.actor.ActorRegistry.actorFor class Boot { val factory = SupervisorFactory( @@ -122,7 +122,7 @@ class SecureTickService { def tick = { //Fetch the first actor of type PersistentSimpleServiceActor //Send it the "Tick" message and expect a NdeSeq back - val result = for{a <- actorsFor(classOf[SecureTickActor]).headOption + val result = for{a <- actorFor[SecureTickActor] r <- (a !! "Tick").as[Integer]} yield r //Return either the resulting NodeSeq or a default one result match {