closing ticket441, implemented typed actor methods for ActorRegistry

This commit is contained in:
Michael Kober 2010-09-28 11:12:40 +02:00
parent e02744425d
commit 2e9d87364a
4 changed files with 266 additions and 9 deletions

View file

@ -10,8 +10,9 @@ import scala.reflect.Manifest
import java.util.concurrent.{ConcurrentSkipListSet, ConcurrentHashMap}
import java.util.{Set => JSet}
import se.scalablesolutions.akka.util.ListenerManagement
import annotation.tailrec
import se.scalablesolutions.akka.util.ListenerManagement
import se.scalablesolutions.akka.util.ReflectiveAccess._
/**
* Base trait for ActorRegistry events, allows listen to when an actor is added and removed from the ActorRegistry.
@ -109,11 +110,126 @@ object ActorRegistry extends ListenerManagement {
*/
def actorsFor(id: String): Array[ActorRef] = actorsById values id
/**
/**
* Finds the actor that has a specific UUID.
*/
def actorFor(uuid: Uuid): Option[ActorRef] = Option(actorsByUUID get uuid)
/**
* Returns all typed actors in the system.
*/
def typedActors: Array[AnyRef] = filterTypedActors(_ => true)
/**
* Invokes a function for all typed actors.
*/
def foreachTypedActor(f: (AnyRef) => Unit) = {
val elements = actorsByUUID.elements
while (elements.hasMoreElements) {
val proxy = typedActorFor(elements.nextElement)
if (proxy.isDefined) {
f(proxy.get)
}
}
}
/**
* Invokes the function on all known typed actors until it returns Some
* Returns None if the function never returns Some
*/
def findTypedActor[T](f: PartialFunction[AnyRef,T]) : Option[T] = {
val elements = actorsByUUID.elements
while (elements.hasMoreElements) {
val proxy = typedActorFor(elements.nextElement)
if(proxy.isDefined && (f isDefinedAt proxy))
return Some(f(proxy))
}
None
}
/**
* Finds all typed actors that satisfy a predicate.
*/
def filterTypedActors(p: AnyRef => Boolean): Array[AnyRef] = {
TypedActorModule.ensureTypedActorEnabled
val all = new ListBuffer[AnyRef]
val elements = actorsByUUID.elements
while (elements.hasMoreElements) {
val proxy = typedActorFor(elements.nextElement)
if (proxy.isDefined && p(proxy.get)) {
all += proxy.get
}
}
all.toArray
}
/**
* Finds all typed actors that are subtypes of the class passed in as the Manifest argument.
*/
def typedActorsFor[T <: AnyRef](implicit manifest: Manifest[T]): Array[AnyRef] = {
TypedActorModule.ensureTypedActorEnabled
typedActorsFor[T](manifest.erasure.asInstanceOf[Class[T]])
}
/**
* Finds any typed actor that matches T.
*/
def typedActorFor[T <: AnyRef](implicit manifest: Manifest[T]): Option[AnyRef] = {
def predicate(proxy: AnyRef) : Boolean = {
val actorRef = actorFor(proxy)
actorRef.isDefined && manifest.erasure.isAssignableFrom(actorRef.get.actor.getClass)
}
findTypedActor({ case a:AnyRef if predicate(a) => a })
}
/**
* Finds all typed actors of type or sub-type specified by the class passed in as the Class argument.
*/
def typedActorsFor[T <: AnyRef](clazz: Class[T]): Array[AnyRef] = {
TypedActorModule.ensureTypedActorEnabled
def predicate(proxy: AnyRef) : Boolean = {
val actorRef = actorFor(proxy)
actorRef.isDefined && clazz.isAssignableFrom(actorRef.get.actor.getClass)
}
filterTypedActors(predicate)
}
/**
* Finds all typed actors that have a specific id.
*/
def typedActorsFor(id: String): Array[AnyRef] = {
TypedActorModule.ensureTypedActorEnabled
val actorRefs = actorsById values id
actorRefs.flatMap(typedActorFor(_))
}
/**
* Finds the typed actor that has a specific UUID.
*/
def typedActorFor(uuid: Uuid): Option[AnyRef] = {
TypedActorModule.ensureTypedActorEnabled
val actorRef = actorsByUUID get uuid
if (actorRef eq null)
None
else
typedActorFor(actorRef)
}
/**
* Get the typed actor proxy for a given typed actor ref.
*/
private def typedActorFor(actorRef: ActorRef): Option[AnyRef] = {
TypedActorModule.typedActorObjectInstance.get.proxyFor(actorRef)
}
/**
* Get the underlying typed actor for a given proxy.
*/
private def actorFor(proxy: AnyRef): Option[ActorRef] = {
TypedActorModule.typedActorObjectInstance.get.actorFor(proxy)
}
/**
* Registers an actor in the ActorRegistry.
*/
@ -145,7 +261,20 @@ object ActorRegistry extends ListenerManagement {
*/
def shutdownAll() {
log.info("Shutting down all actors in the system...")
foreach(_.stop)
if (TypedActorModule.isTypedActorEnabled) {
val elements = actorsByUUID.elements
while (elements.hasMoreElements) {
val actorRef = elements.nextElement
val proxy = typedActorFor(actorRef)
if (proxy.isDefined) {
TypedActorModule.typedActorObjectInstance.get.stop(proxy.get)
} else {
actorRef.stop
}
}
} else {
foreach(_.stop)
}
actorsByUUID.clear
actorsById.clear
log.info("All actors have been shut down and unregistered from ActorRegistry")

View file

@ -154,6 +154,9 @@ object ReflectiveAccess extends Logging {
type TypedActorObject = {
def isJoinPoint(message: Any): Boolean
def isJoinPointAndOneWay(message: Any): Boolean
def actorFor(proxy: AnyRef): Option[ActorRef]
def proxyFor(actorRef: ActorRef): Option[AnyRef]
def stop(anyRef: AnyRef) : Unit
}
lazy val isTypedActorEnabled = typedActorObjectInstance.isDefined

View file

@ -467,13 +467,24 @@ object TypedActor extends Logging {
def stop(proxy: AnyRef): Unit = AspectInitRegistry.unregister(proxy)
/**
* Get the underlying dispatcher actor for the given Typed Actor.
* Get the underlying typed actor for the given Typed Actor.
*/
def actorFor(proxy: AnyRef): Option[ActorRef] =
ActorRegistry
.actorsFor(classOf[TypedActor])
.find(a => a.actor.asInstanceOf[TypedActor].proxy == proxy)
/**
* Get the typed actor proxy for the given Typed Actor.
*/
def proxyFor(actorRef: ActorRef): Option[AnyRef] = {
if (actorRef.actor.isInstanceOf[TypedActor]) {
Some(actorRef.actor.asInstanceOf[TypedActor].proxy)
} else {
None
}
}
/**
* Links an other Typed Actor to this Typed Actor.
* @param supervisor the supervisor Typed Actor

View file

@ -7,25 +7,139 @@ package se.scalablesolutions.akka.actor
import org.scalatest.Spec
import org.scalatest.Assertions
import org.scalatest.matchers.ShouldMatchers
import org.scalatest.BeforeAndAfterAll
import org.scalatest.BeforeAndAfterEach
import org.scalatest.junit.JUnitRunner
import org.junit.runner.RunWith
import se.scalablesolutions.akka.dispatch.DefaultCompletableFuture;
import se.scalablesolutions.akka.dispatch.DefaultCompletableFuture
import TypedActorSpec._
object TypedActorSpec {
trait MyTypedActor {
def sendOneWay(msg: String) : Unit
}
class MyTypedActorImpl extends TypedActor with MyTypedActor {
self.id = "my-custom-id"
def sendOneWay(msg: String) {
println("got " + msg)
}
}
class MyActor extends Actor {
self.id = "my-custom-id"
def receive = {
case msg: String => println("got " + msg)
}
}
}
@RunWith(classOf[JUnitRunner])
class TypedActorSpec extends
Spec with
ShouldMatchers with
BeforeAndAfterAll {
BeforeAndAfterEach {
var simplePojo: SimpleJavaPojo = null
var pojo: MyTypedActor = null;
override def beforeEach() {
simplePojo = TypedActor.newInstance(classOf[SimpleJavaPojo], classOf[SimpleJavaPojoImpl])
pojo = TypedActor.newInstance(classOf[MyTypedActor], classOf[MyTypedActorImpl])
}
override def afterEach() {
ActorRegistry.shutdownAll
}
describe("TypedActor") {
it("should resolve Future return from method defined to return a Future") {
val pojo = TypedActor.newInstance(classOf[SimpleJavaPojo], classOf[SimpleJavaPojoImpl])
val future = pojo.square(10)
val future = simplePojo.square(10)
future.await
future.result.isDefined should equal (true)
future.result.get should equal (100)
}
}
describe("TypedActor object") {
it("should support finding the underlying actor for a given proxy and the proxy for a given actor") {
val typedActorRef = TypedActor.actorFor(simplePojo).get
val typedActor = typedActorRef.actor.asInstanceOf[TypedActor]
assert(typedActor.proxy === simplePojo)
assert(TypedActor.proxyFor(typedActorRef).get === simplePojo)
}
}
describe("ActorRegistry") {
it("should support finding a typed actor by uuid ") {
val typedActorRef = TypedActor.actorFor(simplePojo).get
val uuid = typedActorRef.uuid
println("### 1")
assert(ActorRegistry.typedActorFor(newUuid()) === None)
println("### 2")
assert(ActorRegistry.typedActorFor(uuid).isDefined)
println("### 3")
assert(ActorRegistry.typedActorFor(uuid).get === simplePojo)
}
it("should support finding typed actors by id ") {
val typedActors = ActorRegistry.typedActorsFor("my-custom-id")
assert(typedActors.length === 1)
assert(typedActors.contains(pojo))
// creating untyped actor with same custom id
val actorRef = Actor.actorOf[MyActor].start
val typedActors2 = ActorRegistry.typedActorsFor("my-custom-id")
assert(typedActors2.length === 1)
assert(typedActors2.contains(pojo))
actorRef.stop
}
it("should support to filter typed actors") {
val actors = ActorRegistry.filterTypedActors(ta => ta.isInstanceOf[MyTypedActor])
assert(actors.length === 1)
assert(actors.contains(pojo))
}
it("should support to find typed actors by class") {
val actors = ActorRegistry.typedActorsFor(classOf[MyTypedActorImpl])
assert(actors.length === 1)
assert(actors.contains(pojo))
assert(ActorRegistry.typedActorsFor(classOf[MyActor]).isEmpty)
}
it("should support to get all typed actors") {
val actors = ActorRegistry.typedActors
assert(actors.length === 2)
assert(actors.contains(pojo))
assert(actors.contains(simplePojo))
}
it("should support to find typed actors by manifest") {
val actors = ActorRegistry.typedActorsFor[MyTypedActorImpl]
assert(actors.length === 1)
assert(actors.contains(pojo))
assert(ActorRegistry.typedActorsFor[MyActor].isEmpty)
}
it("should support foreach for typed actors") {
val actorRef = Actor.actorOf[MyActor].start
assert(ActorRegistry.actors.size === 3)
assert(ActorRegistry.typedActors.size === 2)
ActorRegistry.foreachTypedActor(TypedActor.stop(_))
assert(ActorRegistry.actors.size === 1)
assert(ActorRegistry.typedActors.size === 0)
}
it("should shutdown all typed and untyped actors") {
val actorRef = Actor.actorOf[MyActor].start
assert(ActorRegistry.actors.size === 3)
assert(ActorRegistry.typedActors.size === 2)
ActorRegistry.shutdownAll()
assert(ActorRegistry.actors.size === 0)
assert(ActorRegistry.typedActors.size === 0)
}
}
}