diff --git a/akka-core/src/main/scala/actor/ActorRegistry.scala b/akka-core/src/main/scala/actor/ActorRegistry.scala
index 7ca40d1a82..9e0b1cba08 100644
--- a/akka-core/src/main/scala/actor/ActorRegistry.scala
+++ b/akka-core/src/main/scala/actor/ActorRegistry.scala
@@ -6,92 +6,123 @@ package se.scalablesolutions.akka.actor
import se.scalablesolutions.akka.util.Logging
-import scala.collection.mutable.{ListBuffer, HashMap}
+import scala.collection.mutable.ListBuffer
import scala.reflect.Manifest
+import java.util.concurrent.ConcurrentHashMap
+
/**
- * Registry holding all actor instances, mapped by class, the actor's uuid and the actor's id field (which can be set
- * by user-code).
+ * Registry holding all Actor instances in the whole system.
+ * Mapped by:
+ *
+ * - the Actor's UUID
+ * - the Actor's id field (which can be set by user-code)
+ * - the Actor's class
+ *
*
* @author Jonas Bonér
*/
object ActorRegistry extends Logging {
- private val actorsByClassName = new HashMap[String, List[Actor]]
- private val actorsById = new HashMap[String, List[Actor]]
- private val actorsByUuid = new HashMap[String, Actor]
+ private val actorsByUUID = new ConcurrentHashMap[String, Actor]
+ private val actorsById = new ConcurrentHashMap[String, List[Actor]]
+ private val actorsByClassName = new ConcurrentHashMap[String, List[Actor]]
/**
* Returns all actors in the system.
*/
- def actors: List[Actor] = synchronized {
+ def actors: List[Actor] = {
val all = new ListBuffer[Actor]
- actorsById.values.foreach(all ++= _)
+ val elements = actorsByUUID.elements
+ while (elements.hasMoreElements) all += elements.nextElement
all.toList
}
/**
* Invokes a function for all actors.
*/
- def foreach(f: (Actor) => Unit) = actors.foreach(f)
+ def foreach(f: (Actor) => Unit) = {
+ val elements = actorsByUUID.elements
+ while (elements.hasMoreElements) f(elements.nextElement)
+ }
/**
* Finds all actors that are subtypes of the class passed in as the Manifest argument.
*/
- def actorsFor[T <: Actor](implicit manifest: Manifest[T]): List[T] = synchronized {
- for (actor <- actors; if manifest.erasure.isAssignableFrom(actor.getClass)) yield actor.asInstanceOf[T]
+ def actorsFor[T <: Actor](implicit manifest: Manifest[T]): List[T] = {
+ val all = new ListBuffer[T]
+ val elements = actorsByUUID.elements
+ while (elements.hasMoreElements) {
+ val actor = elements.nextElement
+ if (manifest.erasure.isAssignableFrom(actor.getClass)) {
+ all += actor.asInstanceOf[T]
+ }
+ }
+ all.toList
}
/**
* Finds all actors of the exact type specified by the class passed in as the Class argument.
*/
- def actorsFor[T <: Actor](clazz: Class[T]): List[T] = synchronized {
- actorsByClassName.get(clazz.getName) match {
- case None => Nil
- case Some(instances) => instances.asInstanceOf[List[T]]
- }
+ def actorsFor[T <: Actor](clazz: Class[T]): List[T] = {
+ if (actorsByClassName.containsKey(clazz.getName)) {
+ actorsByClassName.get(clazz.getName).asInstanceOf[List[T]]
+ } else Nil
}
/**
- * Finds all actors that have a specific id.
+ * Finds all actors that has a specific id.
*/
- def actorsFor(id : String): List[Actor] = synchronized {
- actorsById.get(id) match {
- case None => Nil
- case Some(instances) => instances
- }
+ def actorsFor(id: String): List[Actor] = {
+ if (actorsById.containsKey(id)) actorsById.get(id).asInstanceOf[List[Actor]]
+ else Nil
+ }
+
+ /**
+ * Finds the actor that has a specific UUID.
+ */
+ def actorFor(uuid: String): Option[Actor] = {
+ if (actorsByUUID.containsKey(uuid)) Some(actorsByUUID.get(uuid))
+ else None
}
/**
- * Finds the actor that has a specific uuid.
+ * Registers an actor in the ActorRegistry.
*/
- def actorFor(uuid : String): Option[Actor] = synchronized {
- actorsByUuid.get(uuid)
- }
+ def register(actor: Actor) = {
+ // UUID
+ actorsByUUID.put(actor.uuid, actor)
- def register(actor: Actor) = synchronized {
- val className = actor.getClass.getName
- actorsByClassName.get(className) match {
- case Some(instances) => actorsByClassName + (className -> (actor :: instances))
- case None => actorsByClassName + (className -> (actor :: Nil))
- }
+ // ID
val id = actor.getId
if (id eq null) throw new IllegalStateException("Actor.id is null " + actor)
- actorsById.get(id) match {
- case Some(instances) => actorsById + (id -> (actor :: instances))
- case None => actorsById + (id -> (actor :: Nil))
- }
- actorsByUuid + (actor.uuid -> actor)
+ if (actorsById.containsKey(id)) actorsById.put(id, actor :: actorsById.get(id))
+ else actorsById.put(id, actor :: Nil)
+
+ // Class name
+ val className = actor.getClass.getName
+ if (actorsByClassName.containsKey(className)) {
+ actorsByClassName.put(className, actor :: actorsByClassName.get(className))
+ } else actorsByClassName.put(className, actor :: Nil)
}
- def unregister(actor: Actor) = synchronized {
- actorsByClassName - actor.getClass.getName
- actorsById - actor.getId
- actorsByUuid - actor.uuid
+ /**
+ * Unregisters an actor in the ActorRegistry.
+ */
+ def unregister(actor: Actor) = {
+ actorsByUUID remove actor.uuid
+ actorsById remove actor.getId
+ actorsByClassName remove actor.getClass.getName
}
+ /**
+ * Shuts down and unregisters all actors in the system.
+ */
def shutdownAll = {
log.info("Shutting down all actors in the system...")
- actorsById.foreach(entry => entry._2.map(_.stop))
- log.info("All actors have been shut down")
+ foreach(_.stop)
+ actorsByUUID.clear
+ actorsById.clear
+ actorsByClassName.clear
+ log.info("All actors have been shut down and unregistered from ActorRegistry")
}
}
diff --git a/akka-core/src/test/scala/ActorRegistryTest.scala b/akka-core/src/test/scala/ActorRegistryTest.scala
index 4c0292abcf..faa4a46b18 100644
--- a/akka-core/src/test/scala/ActorRegistryTest.scala
+++ b/akka-core/src/test/scala/ActorRegistryTest.scala
@@ -1,55 +1,160 @@
package se.scalablesolutions.akka.actor
-import org.junit.Assert._
import org.scalatest.junit.JUnitSuite
import org.junit.Test
class ActorRegistryTest extends JUnitSuite {
-
- val registry = ActorRegistry
-
- @Test
- def testRegistrationWithDefaultId {
- val actor = new TestActor1
- assertEquals(actor.getClass.getName, actor.getId)
- testRegistration(actor, classOf[TestActor1])
+ var record = ""
+ class TestActor extends Actor {
+ id = "MyID"
+ def receive = {
+ case "ping" =>
+ record = "pong" + record
+ }
}
- @Test
- def testRegistrationWithCustomId {
- val actor = new TestActor2
- assertEquals("customid", actor.getId)
- testRegistration(actor, classOf[TestActor2])
- }
-
- private def testRegistration[T <: Actor](actor: T, actorClass: Class[T]) {
- assertEquals("non-started actor registered", Nil, registry.actorsFor(actorClass))
- assertEquals("non-started actor registered", Nil, registry.actorsFor(actor.getId))
- assertEquals("non-started actor registered", None, registry.actorFor(actor.uuid))
+ @Test def shouldGetActorByIdFromActorRegistry = {
+ ActorRegistry.shutdownAll
+ val actor = new TestActor
actor.start
- assertEquals("actor not registered", List(actor), registry.actorsFor(actorClass))
- assertEquals("actor not registered", List(actor), registry.actorsFor(actor.getId))
- assertEquals("actor not registered", Some(actor), registry.actorFor(actor.uuid))
+ val actors = ActorRegistry.actorsFor("MyID")
+ assert(actors.size === 1)
+ assert(actors.head.isInstanceOf[TestActor])
+ assert(actors.head.getId == "MyID")
actor.stop
- assertEquals("stopped actor registered", Nil, registry.actorsFor(actorClass))
- assertEquals("stopped actor registered", Nil, registry.actorsFor(actor.getId))
- assertEquals("stopped actor registered", None, registry.actorFor(actor.uuid))
}
+ @Test def shouldGetActorByUUIDFromActorRegistry = {
+ ActorRegistry.shutdownAll
+ val actor = new TestActor
+ val uuid = actor.uuid
+ actor.start
+ val actorOrNone = ActorRegistry.actorFor(uuid)
+ assert(actorOrNone.isDefined)
+ assert(actorOrNone.get.uuid === uuid)
+ actor.stop
+ }
+
+ @Test def shouldGetActorByClassFromActorRegistry = {
+ ActorRegistry.shutdownAll
+ val actor = new TestActor
+ actor.start
+ val actors = ActorRegistry.actorsFor(classOf[TestActor])
+ assert(actors.size === 1)
+ assert(actors.head.isInstanceOf[TestActor])
+ assert(actors.head.getId === "MyID")
+ actor.stop
+ }
+
+ @Test def shouldGetActorByManifestFromActorRegistry = {
+ ActorRegistry.shutdownAll
+ val actor = new TestActor
+ actor.start
+ val actors: List[TestActor] = ActorRegistry.actorsFor[TestActor]
+ assert(actors.size === 1)
+ assert(actors.head.isInstanceOf[TestActor])
+ assert(actors.head.getId === "MyID")
+ actor.stop
+ }
+
+ @Test def shouldGetActorsByIdFromActorRegistry = {
+ ActorRegistry.shutdownAll
+ val actor1 = new TestActor
+ actor1.start
+ val actor2 = new TestActor
+ actor2.start
+ val actors = ActorRegistry.actorsFor("MyID")
+ assert(actors.size === 2)
+ assert(actors.head.isInstanceOf[TestActor])
+ assert(actors.head.getId === "MyID")
+ assert(actors.last.isInstanceOf[TestActor])
+ assert(actors.last.getId === "MyID")
+ actor1.stop
+ actor2.stop
+ }
+
+ @Test def shouldGetActorsByClassFromActorRegistry = {
+ ActorRegistry.shutdownAll
+ val actor1 = new TestActor
+ actor1.start
+ val actor2 = new TestActor
+ actor2.start
+ val actors = ActorRegistry.actorsFor(classOf[TestActor])
+ assert(actors.size === 2)
+ assert(actors.head.isInstanceOf[TestActor])
+ assert(actors.head.getId === "MyID")
+ assert(actors.last.isInstanceOf[TestActor])
+ assert(actors.last.getId === "MyID")
+ actor1.stop
+ actor2.stop
+ }
+
+ @Test def shouldGetActorsByManifestFromActorRegistry = {
+ ActorRegistry.shutdownAll
+ val actor1 = new TestActor
+ actor1.start
+ val actor2 = new TestActor
+ actor2.start
+ val actors: List[TestActor] = ActorRegistry.actorsFor[TestActor]
+ assert(actors.size === 2)
+ assert(actors.head.isInstanceOf[TestActor])
+ assert(actors.head.getId === "MyID")
+ assert(actors.last.isInstanceOf[TestActor])
+ assert(actors.last.getId === "MyID")
+ actor1.stop
+ actor2.stop
+ }
+
+ @Test def shouldGetAllActorsFromActorRegistry = {
+ ActorRegistry.shutdownAll
+ val actor1 = new TestActor
+ actor1.start
+ val actor2 = new TestActor
+ actor2.start
+ val actors = ActorRegistry.actors
+ assert(actors.size === 2)
+ assert(actors.head.isInstanceOf[TestActor])
+ assert(actors.head.getId === "MyID")
+ assert(actors.last.isInstanceOf[TestActor])
+ assert(actors.last.getId === "MyID")
+ actor1.stop
+ actor2.stop
+ }
+
+ @Test def shouldGetResponseByAllActorsInActorRegistryWhenInvokingForeach = {
+ ActorRegistry.shutdownAll
+ val actor1 = new TestActor
+ actor1.start
+ val actor2 = new TestActor
+ actor2.start
+ record = ""
+ ActorRegistry.foreach(actor => actor send "ping")
+ Thread.sleep(1000)
+ assert(record === "pongpong")
+ actor1.stop
+ actor2.stop
+ }
+
+ @Test def shouldShutdownAllActorsInActorRegistry = {
+ ActorRegistry.shutdownAll
+ val actor1 = new TestActor
+ actor1.start
+ val actor2 = new TestActor
+ actor2.start
+ ActorRegistry.shutdownAll
+ assert(ActorRegistry.actors.size === 0)
+ }
+
+ @Test def shouldRemoveUnregisterActorInActorRegistry = {
+ ActorRegistry.shutdownAll
+ val actor1 = new TestActor
+ actor1.start
+ val actor2 = new TestActor
+ actor2.start
+ assert(ActorRegistry.actors.size === 2)
+ ActorRegistry.unregister(actor1)
+ assert(ActorRegistry.actors.size === 1)
+ ActorRegistry.unregister(actor2)
+ assert(ActorRegistry.actors.size === 0)
+ }
}
-
-class TestActor1 extends Actor {
-
- // use default id
-
- protected def receive = null
-
-}
-
-class TestActor2 extends Actor {
-
- id = "customid"
-
- protected def receive = null
-
-}
\ No newline at end of file
diff --git a/akka-samples/akka-sample-chat/src/main/scala/ChatServer.scala b/akka-samples/akka-sample-chat/src/main/scala/ChatServer.scala
index f605fc7cba..d03c209706 100644
--- a/akka-samples/akka-sample-chat/src/main/scala/ChatServer.scala
+++ b/akka-samples/akka-sample-chat/src/main/scala/ChatServer.scala
@@ -76,11 +76,7 @@ trait ChatStorage extends Actor
class RedisChatStorage extends ChatStorage {
lifeCycle = Some(LifeCycle(Permanent))
- private var chatLog: PersistentVector[Array[Byte]] = _
-
- override def initTransactionalState = chatLog = RedisStorage.getVector("akka.chat.log")
-
- chatLog = RedisStorage.getVector("akka.chat.log")
+ private var chatLog = atomic { RedisStorage.getVector("akka.chat.log") }
log.info("Redis-based chat storage is starting up...")