ActorRegistry: now based on ConcurrentHashMap, now have extensive tests, now has actorFor(uuid): Option[Actor]

This commit is contained in:
Jonas Bonér 2010-02-28 15:56:36 +01:00
parent 9cea01dce1
commit f571c07df2
3 changed files with 289 additions and 39 deletions

View file

@ -6,81 +6,123 @@ package se.scalablesolutions.akka.actor
import se.scalablesolutions.akka.util.Logging import se.scalablesolutions.akka.util.Logging
import scala.collection.mutable.{ListBuffer, HashMap} import scala.collection.mutable.ListBuffer
import scala.reflect.Manifest import scala.reflect.Manifest
import java.util.concurrent.ConcurrentHashMap
/** /**
* Registry holding all actor instances, mapped by class and the actor's id field (which can be set by user-code). * Registry holding all Actor instances in the whole system.
* Mapped by:
* <ul>
* <li>the Actor's UUID</li>
* <li>the Actor's id field (which can be set by user-code)</li>
* <li>the Actor's class</li>
* <ul>
* *
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a> * @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/ */
object ActorRegistry extends Logging { object ActorRegistry extends Logging {
private val actorsByClassName = new HashMap[String, List[Actor]] private val actorsByUUID = new ConcurrentHashMap[String, Actor]
private val actorsById = new HashMap[String, List[Actor]] private val actorsById = new ConcurrentHashMap[String, List[Actor]]
private val actorsByClassName = new ConcurrentHashMap[String, List[Actor]]
/** /**
* Returns all actors in the system. * Returns all actors in the system.
*/ */
def actors: List[Actor] = synchronized { def actors: List[Actor] = {
val all = new ListBuffer[Actor] val all = new ListBuffer[Actor]
actorsById.values.foreach(all ++= _) val elements = actorsByUUID.elements
while (elements.hasMoreElements) all += elements.nextElement
all.toList all.toList
} }
/** /**
* Invokes a function for all actors. * Invokes a function for all actors.
*/ */
def foreach(f: (Actor) => Unit) = actors.foreach(f) def foreach(f: (Actor) => Unit) = {
val elements = actorsByUUID.elements
while (elements.hasMoreElements) f(elements.nextElement)
}
/** /**
* Finds all actors that are subtypes of the class passed in as the Manifest argument. * Finds all actors that are subtypes of the class passed in as the Manifest argument.
*/ */
def actorsFor[T <: Actor](implicit manifest: Manifest[T]): List[T] = synchronized { def actorsFor[T <: Actor](implicit manifest: Manifest[T]): List[T] = {
for (actor <- actors; if manifest.erasure.isAssignableFrom(actor.getClass)) yield actor.asInstanceOf[T] val all = new ListBuffer[T]
val elements = actorsByUUID.elements
while (elements.hasMoreElements) {
val actor = elements.nextElement
if (manifest.erasure.isAssignableFrom(actor.getClass)) {
all += actor.asInstanceOf[T]
}
}
all.toList
} }
/** /**
* Finds all actors of the exact type specified by the class passed in as the Class argument. * Finds all actors of the exact type specified by the class passed in as the Class argument.
*/ */
def actorsFor[T <: Actor](clazz: Class[T]): List[T] = synchronized { def actorsFor[T <: Actor](clazz: Class[T]): List[T] = {
actorsByClassName.get(clazz.getName) match { if (actorsByClassName.containsKey(clazz.getName)) {
case None => Nil actorsByClassName.get(clazz.getName).asInstanceOf[List[T]]
case Some(instances) => instances.asInstanceOf[List[T]] } else Nil
}
} }
/** /**
* Finds all actors that has a specific id. * Finds all actors that has a specific id.
*/ */
def actorsFor(id : String): List[Actor] = synchronized { def actorsFor(id: String): List[Actor] = {
actorsById.get(id) match { if (actorsById.containsKey(id)) actorsById.get(id).asInstanceOf[List[Actor]]
case None => Nil else Nil
case Some(instances) => instances
}
} }
def register(actor: Actor) = synchronized { /**
val className = actor.getClass.getName * Finds the actor that has a specific UUID.
actorsByClassName.get(className) match { */
case Some(instances) => actorsByClassName + (className -> (actor :: instances)) def actorFor(uuid: String): Option[Actor] = {
case None => actorsByClassName + (className -> (actor :: Nil)) if (actorsByUUID.containsKey(uuid)) Some(actorsByUUID.get(uuid))
else None
} }
/**
* Registers an actor in the ActorRegistry.
*/
def register(actor: Actor) = {
// UUID
actorsByUUID.put(actor.uuid, actor)
// ID
val id = actor.getId val id = actor.getId
if (id eq null) throw new IllegalStateException("Actor.id is null " + actor) if (id eq null) throw new IllegalStateException("Actor.id is null " + actor)
actorsById.get(id) match { if (actorsById.containsKey(id)) actorsById.put(id, actor :: actorsById.get(id))
case Some(instances) => actorsById + (id -> (actor :: instances)) else actorsById.put(id, actor :: Nil)
case None => actorsById + (id -> (actor :: Nil))
} // Class name
val className = actor.getClass.getName
if (actorsByClassName.containsKey(className)) {
actorsByClassName.put(className, actor :: actorsByClassName.get(className))
} else actorsByClassName.put(className, actor :: Nil)
} }
def unregister(actor: Actor) = synchronized { /**
actorsByClassName - actor.getClass.getName * Unregisters an actor in the ActorRegistry.
actorsById - actor.getId */
def unregister(actor: Actor) = {
actorsByUUID remove actor.uuid
actorsById remove actor.getId
actorsByClassName remove actor.getClass.getName
} }
/**
* Shuts down and unregisters all actors in the system.
*/
def shutdownAll = { def shutdownAll = {
log.info("Shutting down all actors in the system...") log.info("Shutting down all actors in the system...")
actorsById.foreach(entry => entry._2.map(_.stop)) foreach(_.stop)
log.info("All actors have been shut down") actorsByUUID.clear
actorsById.clear
actorsByClassName.clear
log.info("All actors have been shut down and unregistered from ActorRegistry")
} }
} }

View file

@ -0,0 +1,212 @@
package se.scalablesolutions.akka.actor
import org.scalatest.junit.JUnitSuite
import org.junit.Test
/*
class ActorRegistryTest extends JUnitSuite {
val registry = ActorRegistry
@Test
def testRegistrationWithDefaultId {
val actor = new TestActor1
assertEquals(actor.getClass.getName, actor.getId)
testRegistration(actor, classOf[TestActor1])
}
@Test
def testRegistrationWithCustomId {
val actor = new TestActor2
assertEquals("customid", actor.getId)
testRegistration(actor, classOf[TestActor2])
}
private def testRegistration[T <: Actor](actor: T, actorClass: Class[T]) {
assertEquals("non-started actor registered", Nil, registry.actorsFor(actorClass))
assertEquals("non-started actor registered", Nil, registry.actorsFor(actor.getId))
assertEquals("non-started actor registered", None, registry.actorFor(actor.uuid))
actor.start
assertEquals("actor not registered", List(actor), registry.actorsFor(actorClass))
assertEquals("actor not registered", List(actor), registry.actorsFor(actor.getId))
assertEquals("actor not registered", Some(actor), registry.actorFor(actor.uuid))
actor.stop
assertEquals("stopped actor registered", Nil, registry.actorsFor(actorClass))
assertEquals("stopped actor registered", Nil, registry.actorsFor(actor.getId))
assertEquals("stopped actor registered", None, registry.actorFor(actor.uuid))
}
}
class TestActor1 extends Actor {
// use default id
protected def receive = null
}
class TestActor2 extends Actor {
id = "customid"
protected def receive = null
}
*/
class ActorRegistryTest extends JUnitSuite {
var record = ""
class TestActor extends Actor {
id = "MyID"
def receive = {
case "ping" =>
record = "pong" + record
}
}
@Test def shouldGetActorByIdFromActorRegistry = {
ActorRegistry.shutdownAll
val actor = new TestActor
actor.start
val actors = ActorRegistry.actorsFor("MyID")
assert(actors.size === 1)
assert(actors.head.isInstanceOf[TestActor])
assert(actors.head.getId == "MyID")
actor.stop
}
@Test def shouldGetActorByUUIDFromActorRegistry = {
ActorRegistry.shutdownAll
val actor = new TestActor
val uuid = actor.uuid
actor.start
val actorOrNone = ActorRegistry.actorFor(uuid)
assert(actorOrNone.isDefined)
assert(actorOrNone.get.uuid === uuid)
actor.stop
}
@Test def shouldGetActorByClassFromActorRegistry = {
ActorRegistry.shutdownAll
val actor = new TestActor
actor.start
val actors = ActorRegistry.actorsFor(classOf[TestActor])
assert(actors.size === 1)
assert(actors.head.isInstanceOf[TestActor])
assert(actors.head.getId === "MyID")
actor.stop
}
@Test def shouldGetActorByManifestFromActorRegistry = {
ActorRegistry.shutdownAll
val actor = new TestActor
actor.start
val actors: List[TestActor] = ActorRegistry.actorsFor[TestActor]
assert(actors.size === 1)
assert(actors.head.isInstanceOf[TestActor])
assert(actors.head.getId === "MyID")
actor.stop
}
@Test def shouldGetActorsByIdFromActorRegistry = {
ActorRegistry.shutdownAll
val actor1 = new TestActor
actor1.start
val actor2 = new TestActor
actor2.start
val actors = ActorRegistry.actorsFor("MyID")
assert(actors.size === 2)
assert(actors.head.isInstanceOf[TestActor])
assert(actors.head.getId === "MyID")
assert(actors.last.isInstanceOf[TestActor])
assert(actors.last.getId === "MyID")
actor1.stop
actor2.stop
}
@Test def shouldGetActorsByClassFromActorRegistry = {
ActorRegistry.shutdownAll
val actor1 = new TestActor
actor1.start
val actor2 = new TestActor
actor2.start
val actors = ActorRegistry.actorsFor(classOf[TestActor])
assert(actors.size === 2)
assert(actors.head.isInstanceOf[TestActor])
assert(actors.head.getId === "MyID")
assert(actors.last.isInstanceOf[TestActor])
assert(actors.last.getId === "MyID")
actor1.stop
actor2.stop
}
@Test def shouldGetActorsByManifestFromActorRegistry = {
ActorRegistry.shutdownAll
val actor1 = new TestActor
actor1.start
val actor2 = new TestActor
actor2.start
val actors: List[TestActor] = ActorRegistry.actorsFor[TestActor]
assert(actors.size === 2)
assert(actors.head.isInstanceOf[TestActor])
assert(actors.head.getId === "MyID")
assert(actors.last.isInstanceOf[TestActor])
assert(actors.last.getId === "MyID")
actor1.stop
actor2.stop
}
@Test def shouldGetAllActorsFromActorRegistry = {
ActorRegistry.shutdownAll
val actor1 = new TestActor
actor1.start
val actor2 = new TestActor
actor2.start
val actors = ActorRegistry.actors
assert(actors.size === 2)
assert(actors.head.isInstanceOf[TestActor])
assert(actors.head.getId === "MyID")
assert(actors.last.isInstanceOf[TestActor])
assert(actors.last.getId === "MyID")
actor1.stop
actor2.stop
}
@Test def shouldGetResponseByAllActorsInActorRegistryWhenInvokingForeach = {
ActorRegistry.shutdownAll
val actor1 = new TestActor
actor1.start
val actor2 = new TestActor
actor2.start
record = ""
ActorRegistry.foreach(actor => actor send "ping")
Thread.sleep(1000)
assert(record === "pongpong")
actor1.stop
actor2.stop
}
@Test def shouldShutdownAllActorsInActorRegistry = {
ActorRegistry.shutdownAll
val actor1 = new TestActor
actor1.start
val actor2 = new TestActor
actor2.start
ActorRegistry.shutdownAll
assert(ActorRegistry.actors.size === 0)
}
@Test def shouldRemoveUnregisterActorInActorRegistry = {
ActorRegistry.shutdownAll
val actor1 = new TestActor
actor1.start
val actor2 = new TestActor
actor2.start
assert(ActorRegistry.actors.size === 2)
ActorRegistry.unregister(actor1)
assert(ActorRegistry.actors.size === 1)
ActorRegistry.unregister(actor2)
assert(ActorRegistry.actors.size === 0)
}
}

View file

@ -76,11 +76,7 @@ trait ChatStorage extends Actor
class RedisChatStorage extends ChatStorage { class RedisChatStorage extends ChatStorage {
lifeCycle = Some(LifeCycle(Permanent)) lifeCycle = Some(LifeCycle(Permanent))
private var chatLog: PersistentVector[Array[Byte]] = _ private var chatLog = atomic { RedisStorage.getVector("akka.chat.log") }
override def initTransactionalState = chatLog = RedisStorage.getVector("akka.chat.log")
chatLog = RedisStorage.getVector("akka.chat.log")
log.info("Redis-based chat storage is starting up...") log.info("Redis-based chat storage is starting up...")