Closing ticket 352
This commit is contained in:
parent
5809a01c19
commit
f9750d8dc6
5 changed files with 39 additions and 29 deletions
|
|
@ -31,7 +31,6 @@ case class ActorUnregistered(actor: ActorRef) extends ActorRegistryEvent
|
|||
object ActorRegistry extends ListenerManagement {
|
||||
private val actorsByUUID = new ConcurrentHashMap[String, ActorRef]
|
||||
private val actorsById = new ConcurrentHashMap[String, JSet[ActorRef]]
|
||||
private val actorsByClassName = new ConcurrentHashMap[String, JSet[ActorRef]]
|
||||
|
||||
/**
|
||||
* Returns all actors in the system.
|
||||
|
|
@ -46,6 +45,21 @@ object ActorRegistry extends ListenerManagement {
|
|||
while (elements.hasMoreElements) f(elements.nextElement)
|
||||
}
|
||||
|
||||
/**
|
||||
* Invokes the function on all known actors until it returns Some
|
||||
* Returns None if the function never returns Some
|
||||
*/
|
||||
def find[T](f: (ActorRef) => Option[T]) : Option[T] = {
|
||||
val elements = actorsByUUID.elements
|
||||
while (elements.hasMoreElements) {
|
||||
val result = f(elements.nextElement)
|
||||
|
||||
if(result.isDefined)
|
||||
return result
|
||||
}
|
||||
None
|
||||
}
|
||||
|
||||
/**
|
||||
* Finds all actors that are subtypes of the class passed in as the Manifest argument and supproting passed message.
|
||||
*/
|
||||
|
|
@ -71,22 +85,20 @@ object ActorRegistry extends ListenerManagement {
|
|||
* Finds all actors that are subtypes of the class passed in as the Manifest argument.
|
||||
*/
|
||||
def actorsFor[T <: Actor](implicit manifest: Manifest[T]): List[ActorRef] =
|
||||
filter(a => manifest.erasure.isAssignableFrom(a.actor.getClass))
|
||||
actorsFor[T](manifest.erasure.asInstanceOf[Class[T]])
|
||||
|
||||
/**
|
||||
* Finds any actor that matches T.
|
||||
* FIXME: Improve performance by breaking out after the first match
|
||||
*/
|
||||
def actorFor[T <: Actor](implicit manifest: Manifest[T]): Option[ActorRef] =
|
||||
actorsFor[T](manifest).headOption
|
||||
|
||||
/**
|
||||
* Finds all actors of the exact type specified by the class passed in as the Class argument.
|
||||
* Finds all actors of type or sub-type specified by the class passed in as the Class argument.
|
||||
*/
|
||||
def actorsFor[T <: Actor](clazz: Class[T]): List[ActorRef] = {
|
||||
if (actorsByClassName.containsKey(clazz.getName)) {
|
||||
actorsByClassName.get(clazz.getName).toArray.toList.asInstanceOf[List[ActorRef]]
|
||||
} else Nil
|
||||
}
|
||||
def actorsFor[T <: Actor](clazz: Class[T]): List[ActorRef] =
|
||||
filter(a => clazz.isAssignableFrom(a.actor.getClass))
|
||||
|
||||
/**
|
||||
* Finds all actors that has a specific id.
|
||||
|
|
@ -122,15 +134,6 @@ object ActorRegistry extends ListenerManagement {
|
|||
actorsById.put(id, set)
|
||||
}
|
||||
|
||||
// Class name
|
||||
val className = actor.actorClassName
|
||||
if (actorsByClassName.containsKey(className)) actorsByClassName.get(className).add(actor)
|
||||
else {
|
||||
val set = new ConcurrentSkipListSet[ActorRef]
|
||||
set.add(actor)
|
||||
actorsByClassName.put(className, set)
|
||||
}
|
||||
|
||||
// notify listeners
|
||||
foreachListener(_ ! ActorRegistered(actor))
|
||||
}
|
||||
|
|
@ -144,9 +147,6 @@ object ActorRegistry extends ListenerManagement {
|
|||
val id = actor.id
|
||||
if (actorsById.containsKey(id)) actorsById.get(id).remove(actor)
|
||||
|
||||
val className = actor.actorClassName
|
||||
if (actorsByClassName.containsKey(className)) actorsByClassName.get(className).remove(actor)
|
||||
|
||||
// notify listeners
|
||||
foreachListener(_ ! ActorUnregistered(actor))
|
||||
}
|
||||
|
|
@ -159,7 +159,6 @@ object ActorRegistry extends ListenerManagement {
|
|||
foreach(_.stop)
|
||||
actorsByUUID.clear
|
||||
actorsById.clear
|
||||
actorsByClassName.clear
|
||||
log.info("All actors have been shut down and unregistered from ActorRegistry")
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -76,6 +76,17 @@ class ActorRegistrySpec extends JUnitSuite {
|
|||
actor.stop
|
||||
}
|
||||
|
||||
@Test def shouldFindThingsFromActorRegistry {
|
||||
ActorRegistry.shutdownAll
|
||||
val actor = actorOf[TestActor]
|
||||
actor.start
|
||||
val found = ActorRegistry.find(a => if(a.actor.isInstanceOf[TestActor]) Some(a) else None)
|
||||
assert(found.isDefined)
|
||||
assert(found.get.actor.isInstanceOf[TestActor])
|
||||
assert(found.get.id === "MyID")
|
||||
actor.stop
|
||||
}
|
||||
|
||||
@Test def shouldGetActorsByIdFromActorRegistry {
|
||||
ActorRegistry.shutdownAll
|
||||
val actor1 = actorOf[TestActor]
|
||||
|
|
|
|||
|
|
@ -64,7 +64,7 @@ object SimpleRestService extends RestHelper {
|
|||
case Get("liftcount" :: _, req) =>
|
||||
//Fetch the first actor of type SimpleServiceActor
|
||||
//Send it the "Tick" message and expect a Node back
|
||||
val result = for( a <- ActorRegistry.actorsFor(classOf[SimpleServiceActor]).headOption;
|
||||
val result = for( a <- ActorRegistry.actorFor[SimpleServiceActor];
|
||||
r <- (a !! "Tick").as[Node] ) yield r
|
||||
|
||||
//Return either the resulting NodeSeq or a default one
|
||||
|
|
@ -85,7 +85,7 @@ object SimpleRestService extends RestHelper {
|
|||
case Get("persistentliftcount" :: _, req) =>
|
||||
//Fetch the first actor of type SimpleServiceActor
|
||||
//Send it the "Tick" message and expect a Node back
|
||||
val result = for( a <- ActorRegistry.actorsFor(classOf[PersistentServiceActor]).headOption;
|
||||
val result = for( a <- ActorRegistry.actorFor[PersistentServiceActor];
|
||||
r <- (a !! "Tick").as[Node] ) yield r
|
||||
|
||||
//Return either the resulting NodeSeq or a default one
|
||||
|
|
|
|||
|
|
@ -16,7 +16,7 @@ import java.lang.Integer
|
|||
import java.nio.ByteBuffer
|
||||
import javax.ws.rs.core.MultivaluedMap
|
||||
import javax.ws.rs.{GET, POST, Path, Produces, WebApplicationException, Consumes,PathParam}
|
||||
import se.scalablesolutions.akka.actor.ActorRegistry.actorsFor
|
||||
import se.scalablesolutions.akka.actor.ActorRegistry.actorFor
|
||||
import org.atmosphere.annotation.{Broadcast, Suspend,Cluster}
|
||||
import org.atmosphere.util.XSSHtmlFilter
|
||||
import org.atmosphere.cpr.{Broadcaster, BroadcastFilter}
|
||||
|
|
@ -53,7 +53,7 @@ class SimpleService {
|
|||
def count = {
|
||||
//Fetch the first actor of type SimpleServiceActor
|
||||
//Send it the "Tick" message and expect a NodeSeq back
|
||||
val result = for{a <- actorsFor(classOf[SimpleServiceActor]).headOption
|
||||
val result = for{a <- actorFor[SimpleServiceActor]
|
||||
r <- (a !! "Tick").as[NodeSeq]} yield r
|
||||
//Return either the resulting NodeSeq or a default one
|
||||
result getOrElse <error>Error in counter</error>
|
||||
|
|
@ -108,7 +108,7 @@ class PersistentSimpleService {
|
|||
def count = {
|
||||
//Fetch the first actor of type PersistentSimpleServiceActor
|
||||
//Send it the "Tick" message and expect a NodeSeq back
|
||||
val result = for{a <- actorsFor(classOf[PersistentSimpleServiceActor]).headOption
|
||||
val result = for{a <- actorFor[PersistentSimpleServiceActor]
|
||||
r <- (a !! "Tick").as[NodeSeq]} yield r
|
||||
//Return either the resulting NodeSeq or a default one
|
||||
result getOrElse <error>Error in counter</error>
|
||||
|
|
@ -155,7 +155,7 @@ class Chat {
|
|||
val msg = ChatMsg(form.getFirst("name"),form.getFirst("action"),form.getFirst("message"))
|
||||
//Fetch the first actor of type ChatActor
|
||||
//Send it the "Tick" message and expect a NodeSeq back
|
||||
val result = for{a <- actorsFor(classOf[ChatActor]).headOption
|
||||
val result = for{a <- actorFor[ChatActor]
|
||||
r <- (a !! msg).as[String]} yield r
|
||||
//Return either the resulting String or a default one
|
||||
result getOrElse "System__error"
|
||||
|
|
|
|||
|
|
@ -10,7 +10,7 @@ import se.scalablesolutions.akka.config.ScalaConfig._
|
|||
import se.scalablesolutions.akka.util.Logging
|
||||
import se.scalablesolutions.akka.security.{BasicAuthenticationActor,BasicCredentials,SpnegoAuthenticationActor,DigestAuthenticationActor, UserInfo}
|
||||
import se.scalablesolutions.akka.stm.TransactionalMap
|
||||
import se.scalablesolutions.akka.actor.ActorRegistry.actorsFor
|
||||
import se.scalablesolutions.akka.actor.ActorRegistry.actorFor
|
||||
|
||||
class Boot {
|
||||
val factory = SupervisorFactory(
|
||||
|
|
@ -122,7 +122,7 @@ class SecureTickService {
|
|||
def tick = {
|
||||
//Fetch the first actor of type PersistentSimpleServiceActor
|
||||
//Send it the "Tick" message and expect a NdeSeq back
|
||||
val result = for{a <- actorsFor(classOf[SecureTickActor]).headOption
|
||||
val result = for{a <- actorFor[SecureTickActor]
|
||||
r <- (a !! "Tick").as[Integer]} yield r
|
||||
//Return either the resulting NodeSeq or a default one
|
||||
result match {
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue