diff --git a/akka-actor/src/main/scala/actor/ActorRegistry.scala b/akka-actor/src/main/scala/actor/ActorRegistry.scala index e425451470..4099d64160 100644 --- a/akka-actor/src/main/scala/actor/ActorRegistry.scala +++ b/akka-actor/src/main/scala/actor/ActorRegistry.scala @@ -10,8 +10,9 @@ import scala.reflect.Manifest import java.util.concurrent.{ConcurrentSkipListSet, ConcurrentHashMap} import java.util.{Set => JSet} -import se.scalablesolutions.akka.util.ListenerManagement import annotation.tailrec +import se.scalablesolutions.akka.util.ListenerManagement +import se.scalablesolutions.akka.util.ReflectiveAccess._ /** * Base trait for ActorRegistry events, allows listen to when an actor is added and removed from the ActorRegistry. @@ -109,11 +110,126 @@ object ActorRegistry extends ListenerManagement { */ def actorsFor(id: String): Array[ActorRef] = actorsById values id - /** + /** * Finds the actor that has a specific UUID. */ def actorFor(uuid: Uuid): Option[ActorRef] = Option(actorsByUUID get uuid) + /** + * Returns all typed actors in the system. + */ + def typedActors: Array[AnyRef] = filterTypedActors(_ => true) + + /** + * Invokes a function for all typed actors. + */ + def foreachTypedActor(f: (AnyRef) => Unit) = { + val elements = actorsByUUID.elements + while (elements.hasMoreElements) { + val proxy = typedActorFor(elements.nextElement) + if (proxy.isDefined) { + f(proxy.get) + } + } + } + + /** + * Invokes the function on all known typed actors until it returns Some + * Returns None if the function never returns Some + */ + def findTypedActor[T](f: PartialFunction[AnyRef,T]) : Option[T] = { + val elements = actorsByUUID.elements + while (elements.hasMoreElements) { + val proxy = typedActorFor(elements.nextElement) + if(proxy.isDefined && (f isDefinedAt proxy)) + return Some(f(proxy)) + } + None + } + + /** + * Finds all typed actors that satisfy a predicate. + */ + def filterTypedActors(p: AnyRef => Boolean): Array[AnyRef] = { + TypedActorModule.ensureTypedActorEnabled + val all = new ListBuffer[AnyRef] + val elements = actorsByUUID.elements + while (elements.hasMoreElements) { + val proxy = typedActorFor(elements.nextElement) + if (proxy.isDefined && p(proxy.get)) { + all += proxy.get + } + } + all.toArray + } + + /** + * Finds all typed actors that are subtypes of the class passed in as the Manifest argument. + */ + def typedActorsFor[T <: AnyRef](implicit manifest: Manifest[T]): Array[AnyRef] = { + TypedActorModule.ensureTypedActorEnabled + typedActorsFor[T](manifest.erasure.asInstanceOf[Class[T]]) + } + + /** + * Finds any typed actor that matches T. + */ + def typedActorFor[T <: AnyRef](implicit manifest: Manifest[T]): Option[AnyRef] = { + def predicate(proxy: AnyRef) : Boolean = { + val actorRef = actorFor(proxy) + actorRef.isDefined && manifest.erasure.isAssignableFrom(actorRef.get.actor.getClass) + } + findTypedActor({ case a:AnyRef if predicate(a) => a }) + } + + /** + * Finds all typed actors of type or sub-type specified by the class passed in as the Class argument. + */ + def typedActorsFor[T <: AnyRef](clazz: Class[T]): Array[AnyRef] = { + TypedActorModule.ensureTypedActorEnabled + def predicate(proxy: AnyRef) : Boolean = { + val actorRef = actorFor(proxy) + actorRef.isDefined && clazz.isAssignableFrom(actorRef.get.actor.getClass) + } + filterTypedActors(predicate) + } + + /** + * Finds all typed actors that have a specific id. + */ + def typedActorsFor(id: String): Array[AnyRef] = { + TypedActorModule.ensureTypedActorEnabled + val actorRefs = actorsById values id + actorRefs.flatMap(typedActorFor(_)) + } + + /** + * Finds the typed actor that has a specific UUID. + */ + def typedActorFor(uuid: Uuid): Option[AnyRef] = { + TypedActorModule.ensureTypedActorEnabled + val actorRef = actorsByUUID get uuid + if (actorRef eq null) + None + else + typedActorFor(actorRef) + } + + /** + * Get the typed actor proxy for a given typed actor ref. + */ + private def typedActorFor(actorRef: ActorRef): Option[AnyRef] = { + TypedActorModule.typedActorObjectInstance.get.proxyFor(actorRef) + } + + /** + * Get the underlying typed actor for a given proxy. + */ + private def actorFor(proxy: AnyRef): Option[ActorRef] = { + TypedActorModule.typedActorObjectInstance.get.actorFor(proxy) + } + + /** * Registers an actor in the ActorRegistry. */ @@ -145,7 +261,20 @@ object ActorRegistry extends ListenerManagement { */ def shutdownAll() { log.info("Shutting down all actors in the system...") - foreach(_.stop) + if (TypedActorModule.isTypedActorEnabled) { + val elements = actorsByUUID.elements + while (elements.hasMoreElements) { + val actorRef = elements.nextElement + val proxy = typedActorFor(actorRef) + if (proxy.isDefined) { + TypedActorModule.typedActorObjectInstance.get.stop(proxy.get) + } else { + actorRef.stop + } + } + } else { + foreach(_.stop) + } actorsByUUID.clear actorsById.clear log.info("All actors have been shut down and unregistered from ActorRegistry") diff --git a/akka-actor/src/main/scala/util/ReflectiveAccess.scala b/akka-actor/src/main/scala/util/ReflectiveAccess.scala index 6a719d3834..617f4c37ee 100644 --- a/akka-actor/src/main/scala/util/ReflectiveAccess.scala +++ b/akka-actor/src/main/scala/util/ReflectiveAccess.scala @@ -154,6 +154,9 @@ object ReflectiveAccess extends Logging { type TypedActorObject = { def isJoinPoint(message: Any): Boolean def isJoinPointAndOneWay(message: Any): Boolean + def actorFor(proxy: AnyRef): Option[ActorRef] + def proxyFor(actorRef: ActorRef): Option[AnyRef] + def stop(anyRef: AnyRef) : Unit } lazy val isTypedActorEnabled = typedActorObjectInstance.isDefined diff --git a/akka-typed-actor/src/main/scala/actor/TypedActor.scala b/akka-typed-actor/src/main/scala/actor/TypedActor.scala index a4c7ddada1..3e2b924e0f 100644 --- a/akka-typed-actor/src/main/scala/actor/TypedActor.scala +++ b/akka-typed-actor/src/main/scala/actor/TypedActor.scala @@ -467,13 +467,24 @@ object TypedActor extends Logging { def stop(proxy: AnyRef): Unit = AspectInitRegistry.unregister(proxy) /** - * Get the underlying dispatcher actor for the given Typed Actor. + * Get the underlying typed actor for the given Typed Actor. */ def actorFor(proxy: AnyRef): Option[ActorRef] = ActorRegistry .actorsFor(classOf[TypedActor]) .find(a => a.actor.asInstanceOf[TypedActor].proxy == proxy) + /** + * Get the typed actor proxy for the given Typed Actor. + */ + def proxyFor(actorRef: ActorRef): Option[AnyRef] = { + if (actorRef.actor.isInstanceOf[TypedActor]) { + Some(actorRef.actor.asInstanceOf[TypedActor].proxy) + } else { + None + } + } + /** * Links an other Typed Actor to this Typed Actor. * @param supervisor the supervisor Typed Actor diff --git a/akka-typed-actor/src/test/scala/actor/typed-actor/TypedActorSpec.scala b/akka-typed-actor/src/test/scala/actor/typed-actor/TypedActorSpec.scala index 7de0a8f5df..8e05deff69 100644 --- a/akka-typed-actor/src/test/scala/actor/typed-actor/TypedActorSpec.scala +++ b/akka-typed-actor/src/test/scala/actor/typed-actor/TypedActorSpec.scala @@ -7,25 +7,139 @@ package se.scalablesolutions.akka.actor import org.scalatest.Spec import org.scalatest.Assertions import org.scalatest.matchers.ShouldMatchers -import org.scalatest.BeforeAndAfterAll +import org.scalatest.BeforeAndAfterEach import org.scalatest.junit.JUnitRunner import org.junit.runner.RunWith -import se.scalablesolutions.akka.dispatch.DefaultCompletableFuture; +import se.scalablesolutions.akka.dispatch.DefaultCompletableFuture +import TypedActorSpec._ + + +object TypedActorSpec { + trait MyTypedActor { + def sendOneWay(msg: String) : Unit + } + + class MyTypedActorImpl extends TypedActor with MyTypedActor { + self.id = "my-custom-id" + def sendOneWay(msg: String) { + println("got " + msg) + } + } + + class MyActor extends Actor { + self.id = "my-custom-id" + def receive = { + case msg: String => println("got " + msg) + } + } + +} @RunWith(classOf[JUnitRunner]) class TypedActorSpec extends Spec with ShouldMatchers with - BeforeAndAfterAll { + BeforeAndAfterEach { + + var simplePojo: SimpleJavaPojo = null + var pojo: MyTypedActor = null; + + override def beforeEach() { + simplePojo = TypedActor.newInstance(classOf[SimpleJavaPojo], classOf[SimpleJavaPojoImpl]) + pojo = TypedActor.newInstance(classOf[MyTypedActor], classOf[MyTypedActorImpl]) + } + + override def afterEach() { + ActorRegistry.shutdownAll + } describe("TypedActor") { it("should resolve Future return from method defined to return a Future") { - val pojo = TypedActor.newInstance(classOf[SimpleJavaPojo], classOf[SimpleJavaPojoImpl]) - val future = pojo.square(10) + val future = simplePojo.square(10) future.await future.result.isDefined should equal (true) future.result.get should equal (100) } } + + describe("TypedActor object") { + it("should support finding the underlying actor for a given proxy and the proxy for a given actor") { + val typedActorRef = TypedActor.actorFor(simplePojo).get + val typedActor = typedActorRef.actor.asInstanceOf[TypedActor] + assert(typedActor.proxy === simplePojo) + assert(TypedActor.proxyFor(typedActorRef).get === simplePojo) + } + } + + describe("ActorRegistry") { + it("should support finding a typed actor by uuid ") { + val typedActorRef = TypedActor.actorFor(simplePojo).get + val uuid = typedActorRef.uuid + println("### 1") + assert(ActorRegistry.typedActorFor(newUuid()) === None) + println("### 2") + assert(ActorRegistry.typedActorFor(uuid).isDefined) + println("### 3") + assert(ActorRegistry.typedActorFor(uuid).get === simplePojo) + } + + it("should support finding typed actors by id ") { + val typedActors = ActorRegistry.typedActorsFor("my-custom-id") + assert(typedActors.length === 1) + assert(typedActors.contains(pojo)) + + // creating untyped actor with same custom id + val actorRef = Actor.actorOf[MyActor].start + val typedActors2 = ActorRegistry.typedActorsFor("my-custom-id") + assert(typedActors2.length === 1) + assert(typedActors2.contains(pojo)) + actorRef.stop + } + + it("should support to filter typed actors") { + val actors = ActorRegistry.filterTypedActors(ta => ta.isInstanceOf[MyTypedActor]) + assert(actors.length === 1) + assert(actors.contains(pojo)) + } + + it("should support to find typed actors by class") { + val actors = ActorRegistry.typedActorsFor(classOf[MyTypedActorImpl]) + assert(actors.length === 1) + assert(actors.contains(pojo)) + assert(ActorRegistry.typedActorsFor(classOf[MyActor]).isEmpty) + } + + it("should support to get all typed actors") { + val actors = ActorRegistry.typedActors + assert(actors.length === 2) + assert(actors.contains(pojo)) + assert(actors.contains(simplePojo)) + } + + it("should support to find typed actors by manifest") { + val actors = ActorRegistry.typedActorsFor[MyTypedActorImpl] + assert(actors.length === 1) + assert(actors.contains(pojo)) + assert(ActorRegistry.typedActorsFor[MyActor].isEmpty) + } + + it("should support foreach for typed actors") { + val actorRef = Actor.actorOf[MyActor].start + assert(ActorRegistry.actors.size === 3) + assert(ActorRegistry.typedActors.size === 2) + ActorRegistry.foreachTypedActor(TypedActor.stop(_)) + assert(ActorRegistry.actors.size === 1) + assert(ActorRegistry.typedActors.size === 0) + } + + it("should shutdown all typed and untyped actors") { + val actorRef = Actor.actorOf[MyActor].start + assert(ActorRegistry.actors.size === 3) + assert(ActorRegistry.typedActors.size === 2) + ActorRegistry.shutdownAll() + assert(ActorRegistry.actors.size === 0) + assert(ActorRegistry.typedActors.size === 0) + } + } }