diff --git a/akka-actor-tests/src/test/scala/akka/actor/supervisor/Ticket669Spec.scala b/akka-actor-tests/src/test/scala/akka/actor/supervisor/Ticket669Spec.scala index 54c8179152..c750be9631 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/supervisor/Ticket669Spec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/supervisor/Ticket669Spec.scala @@ -14,7 +14,7 @@ import org.scalatest.matchers.MustMatchers class Ticket669Spec extends WordSpec with MustMatchers with BeforeAndAfterAll { import Ticket669Spec._ - override def afterAll = Actor.registry.shutdownAll + override def afterAll = Actor.registry.local.shutdownAll "A supervised actor with lifecycle PERMANENT" should { "be able to reply on failure during preRestart" in { @@ -69,4 +69,4 @@ object Ticket669Spec { self.reply_?("failure2") } } -} \ No newline at end of file +} diff --git a/akka-actor-tests/src/test/scala/akka/dataflow/DataFlowSpec.scala b/akka-actor-tests/src/test/scala/akka/dataflow/DataFlowSpec.scala index f5a107f511..4e3bebf663 100644 --- a/akka-actor-tests/src/test/scala/akka/dataflow/DataFlowSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/dataflow/DataFlowSpec.scala @@ -72,7 +72,7 @@ class DataFlowTest extends Spec with ShouldMatchers with BeforeAndAfterAll { /* it("should be able to join streams") { import DataFlow._ - Actor.registry.shutdownAll + Actor.registry.local.shutdownAll def ints(n: Int, max: Int, stream: DataFlowStream[Int]): Unit = if (n != max) { stream <<< n @@ -139,7 +139,7 @@ class DataFlowTest extends Spec with ShouldMatchers with BeforeAndAfterAll { /* it("should be able to conditionally set variables") { import DataFlow._ - Actor.registry.shutdownAll + Actor.registry.local.shutdownAll val latch = new CountDownLatch(1) val x, y, z, v = new DataFlowVariable[Int] diff --git a/akka-actor-tests/src/test/scala/akka/misc/ActorRegistrySpec.scala b/akka-actor-tests/src/test/scala/akka/misc/ActorRegistrySpec.scala index 09a23dbc5c..4030b953db 100644 --- a/akka-actor-tests/src/test/scala/akka/misc/ActorRegistrySpec.scala +++ b/akka-actor-tests/src/test/scala/akka/misc/ActorRegistrySpec.scala @@ -32,68 +32,45 @@ object ActorRegistrySpec { class ActorRegistrySpec extends JUnitSuite { import ActorRegistrySpec._ - @Test def shouldGetActorByIdFromActorRegistry { - Actor.registry.shutdownAll - val actor = actorOf[TestActor] - actor.start - val actors = Actor.registry.actorsFor("MyID") - assert(actors.size === 1) - assert(actors.head.actor.isInstanceOf[TestActor]) - assert(actors.head.id === "MyID") - actor.stop + @Test def shouldGetActorByAddressFromActorRegistry { + Actor.registry.local.shutdownAll + val actor1 = actorOf[TestActor] + actor1.start + val actor2 = Actor.registry.actorFor(actor1.address) + assert(actor2.isDefined) + assert(actor2.get.address === actor1.address) + actor2.get.stop } - @Test def shouldGetActorByUUIDFromActorRegistry { - Actor.registry.shutdownAll + @Test def shouldGetActorByUUIDFromLocalActorRegistry { + Actor.registry.local.shutdownAll val actor = actorOf[TestActor] val uuid = actor.uuid actor.start - val actorOrNone = Actor.registry.actorFor(uuid) + val actorOrNone = Actor.registry.local.actorFor(uuid) assert(actorOrNone.isDefined) assert(actorOrNone.get.uuid === uuid) actor.stop } - @Test def shouldGetActorByClassFromActorRegistry { - Actor.registry.shutdownAll + @Test def shouldFindThingsFromLocalActorRegistry { + Actor.registry.local.shutdownAll val actor = actorOf[TestActor] actor.start - val actors = Actor.registry.actorsFor(classOf[TestActor]) - assert(actors.size === 1) - assert(actors.head.actor.isInstanceOf[TestActor]) - assert(actors.head.id === "MyID") - actor.stop - } - - @Test def shouldGetActorByManifestFromActorRegistry { - Actor.registry.shutdownAll - val actor = actorOf[TestActor] - actor.start - val actors = Actor.registry.actorsFor[TestActor] - assert(actors.size === 1) - assert(actors.head.actor.isInstanceOf[TestActor]) - assert(actors.head.id === "MyID") - actor.stop - } - - @Test def shouldFindThingsFromActorRegistry { - Actor.registry.shutdownAll - val actor = actorOf[TestActor] - actor.start - val found = Actor.registry.find({ case a: ActorRef if a.actor.isInstanceOf[TestActor] => a }) + val found = Actor.registry.local.find({ case a: ActorRef if a.actor.isInstanceOf[TestActor] => a }) assert(found.isDefined) assert(found.get.actor.isInstanceOf[TestActor]) assert(found.get.id === "MyID") actor.stop } - @Test def shouldGetActorsByIdFromActorRegistry { - Actor.registry.shutdownAll + @Test def shouldGetAllActorsFromLocalActorRegistry { + Actor.registry.local.shutdownAll val actor1 = actorOf[TestActor] actor1.start val actor2 = actorOf[TestActor] actor2.start - val actors = Actor.registry.actorsFor("MyID") + val actors = Actor.registry.local.actors assert(actors.size === 2) assert(actors.head.actor.isInstanceOf[TestActor]) assert(actors.head.id === "MyID") @@ -103,120 +80,45 @@ class ActorRegistrySpec extends JUnitSuite { actor2.stop } - @Test def shouldGetActorsByClassFromActorRegistry { - Actor.registry.shutdownAll - val actor1 = actorOf[TestActor] - actor1.start - val actor2 = actorOf[TestActor] - actor2.start - val actors = Actor.registry.actorsFor(classOf[TestActor]) - assert(actors.size === 2) - assert(actors.head.actor.isInstanceOf[TestActor]) - assert(actors.head.id === "MyID") - assert(actors.last.actor.isInstanceOf[TestActor]) - assert(actors.last.id === "MyID") - actor1.stop - actor2.stop - } - - @Test def shouldGetActorsByManifestFromActorRegistry { - Actor.registry.shutdownAll - val actor1 = actorOf[TestActor] - actor1.start - val actor2 = actorOf[TestActor] - actor2.start - val actors = Actor.registry.actorsFor[TestActor] - assert(actors.size === 2) - assert(actors.head.actor.isInstanceOf[TestActor]) - assert(actors.head.id === "MyID") - assert(actors.last.actor.isInstanceOf[TestActor]) - assert(actors.last.id === "MyID") - actor1.stop - actor2.stop - } - - @Test def shouldGetActorsByMessageFromActorRegistry { - - Actor.registry.shutdownAll - val actor1 = actorOf[TestActor] - actor1.start - val actor2 = actorOf[TestActor2] - actor2.start - - val actorsForAcotrTestActor = Actor.registry.actorsFor[TestActor] - assert(actorsForAcotrTestActor.size === 1) - - val actorsForAcotrTestActor2 = Actor.registry.actorsFor[TestActor2] - assert(actorsForAcotrTestActor2.size === 1) - - val actorsForAcotr = Actor.registry.actorsFor[Actor] - assert(actorsForAcotr.size === 2) - - - val actorsForMessagePing2 = Actor.registry.actorsFor[Actor]("ping2") - assert(actorsForMessagePing2.size === 1) - - val actorsForMessagePing = Actor.registry.actorsFor[Actor]("ping") - assert(actorsForMessagePing.size === 2) - - actor1.stop - actor2.stop - } - - @Test def shouldGetAllActorsFromActorRegistry { - Actor.registry.shutdownAll - val actor1 = actorOf[TestActor] - actor1.start - val actor2 = actorOf[TestActor] - actor2.start - val actors = Actor.registry.actors - assert(actors.size === 2) - assert(actors.head.actor.isInstanceOf[TestActor]) - assert(actors.head.id === "MyID") - assert(actors.last.actor.isInstanceOf[TestActor]) - assert(actors.last.id === "MyID") - actor1.stop - actor2.stop - } - - @Test def shouldGetResponseByAllActorsInActorRegistryWhenInvokingForeach { - Actor.registry.shutdownAll + @Test def shouldGetResponseByAllActorsInLocalActorRegistryWhenInvokingForeach { + Actor.registry.local.shutdownAll val actor1 = actorOf[TestActor] actor1.start val actor2 = actorOf[TestActor] actor2.start record = "" - Actor.registry.foreach(actor => actor !! "ping") + Actor.registry.local.foreach(actor => actor !! "ping") assert(record === "pongpong") actor1.stop actor2.stop } - @Test def shouldShutdownAllActorsInActorRegistry { - Actor.registry.shutdownAll + @Test def shouldShutdownAllActorsInLocalActorRegistry { + Actor.registry.local.shutdownAll val actor1 = actorOf[TestActor] actor1.start val actor2 = actorOf[TestActor] actor2.start - Actor.registry.shutdownAll - assert(Actor.registry.actors.size === 0) + Actor.registry.local.shutdownAll + assert(Actor.registry.local.actors.size === 0) } - @Test def shouldRemoveUnregisterActorInActorRegistry { - Actor.registry.shutdownAll + @Test def shouldRemoveUnregisterActorInLocalActorRegistry { + Actor.registry.local.shutdownAll val actor1 = actorOf[TestActor] actor1.start val actor2 = actorOf[TestActor] actor2.start - assert(Actor.registry.actors.size === 2) + assert(Actor.registry.local.actors.size === 2) Actor.registry.unregister(actor1) - assert(Actor.registry.actors.size === 1) + assert(Actor.registry.local.actors.size === 1) Actor.registry.unregister(actor2) - assert(Actor.registry.actors.size === 0) + assert(Actor.registry.local.actors.size === 0) } + /* @Test def shouldBeAbleToRegisterActorsConcurrently { - Actor.registry.shutdownAll + Actor.registry.local.shutdownAll def mkTestActors = for(i <- (1 to 10).toList;j <- 1 to 3000) yield actorOf( new Actor { self.id = i.toString @@ -244,9 +146,10 @@ class ActorRegistrySpec extends JUnitSuite { for(i <- 1 to 10) { val theId = i.toString - val actors = Actor.registry.actorsFor(theId).toSet - for(a <- actors if a.id == theId) assert(actors contains a) + val actor = Actor.registry.local.actorFor(theId) + assert(actor eq a) assert(actors.size === 9000) } } + */ } diff --git a/akka-actor-tests/src/test/scala/akka/misc/SchedulerSpec.scala b/akka-actor-tests/src/test/scala/akka/misc/SchedulerSpec.scala index 79b09d49d1..77b84e130a 100644 --- a/akka-actor-tests/src/test/scala/akka/misc/SchedulerSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/misc/SchedulerSpec.scala @@ -12,7 +12,7 @@ class SchedulerSpec extends JUnitSuite { def withCleanEndState(action: => Unit) { action Scheduler.restart - Actor.registry.shutdownAll + Actor.registry.local.shutdownAll } @@ -62,10 +62,10 @@ class SchedulerSpec extends JUnitSuite { val actor = actorOf(new Actor { def receive = { case Ping => ticks.countDown } }).start - val numActors = Actor.registry.actors.length + val numActors = Actor.registry.local.actors.length (1 to 1000).foreach( _ => Scheduler.scheduleOnce(actor,Ping,1,TimeUnit.MILLISECONDS) ) assert(ticks.await(10,TimeUnit.SECONDS)) - assert(Actor.registry.actors.length === numActors) + assert(Actor.registry.local.actors.length === numActors) } /** diff --git a/akka-actor/src/main/scala/akka/actor/Actor.scala b/akka-actor/src/main/scala/akka/actor/Actor.scala index 718c3b94bb..00a280188a 100644 --- a/akka-actor/src/main/scala/akka/actor/Actor.scala +++ b/akka-actor/src/main/scala/akka/actor/Actor.scala @@ -14,6 +14,7 @@ import scala.reflect.BeanProperty import akka.util. {ReflectiveAccess, Duration} import akka.remoteinterface.RemoteSupport import akka.japi. {Creator, Procedure} +import com.eaio.uuid.UUID /** * Life-cycle messages for the Actors @@ -110,7 +111,7 @@ object Actor extends ListenerManagement { lazy val remote: RemoteSupport = { ReflectiveAccess - .Remote + .RemoteModule .defaultRemoteSupport .map(_()) .getOrElse(throw new UnsupportedOperationException("You need to have akka-remote.jar on classpath")) @@ -141,7 +142,11 @@ object Actor extends ListenerManagement { * val actor = actorOf[MyActor].start * */ - def actorOf[T <: Actor : Manifest]: ActorRef = actorOf(manifest[T].erasure.asInstanceOf[Class[_ <: Actor]]) + def actorOf[T <: Actor : Manifest](address: String): ActorRef = + actorOf(manifest[T].erasure.asInstanceOf[Class[_ <: Actor]], address) + + def actorOf[T <: Actor : Manifest]: ActorRef = + actorOf(manifest[T].erasure.asInstanceOf[Class[_ <: Actor]], (new UUID).toString) /** * Creates an ActorRef out of the Actor of the specified Class. @@ -157,7 +162,7 @@ object Actor extends ListenerManagement { * val actor = actorOf(classOf[MyActor]).start * */ - def actorOf(clazz: Class[_ <: Actor]): ActorRef = new LocalActorRef(() => { + def actorOf(clazz: Class[_ <: Actor], address: String): ActorRef = new LocalActorRef(() => { import ReflectiveAccess.{ createInstance, noParams, noArgs } createInstance[Actor](clazz.asInstanceOf[Class[_]], noParams, noArgs).getOrElse( throw new ActorInitializationException( @@ -165,7 +170,9 @@ object Actor extends ListenerManagement { "\nMake sure Actor is NOT defined inside a class/trait," + "\nif so put it outside the class/trait, f.e. in a companion object," + "\nOR try to change: 'actorOf[MyActor]' to 'actorOf(new MyActor)'.")) - }, None) + }, None, false, address) + + def actorOf(clazz: Class[_ <: Actor]): ActorRef = actorOf(clazz, (new UUID).toString) /** * Creates an ActorRef out of the Actor. Allows you to pass in a factory function @@ -185,7 +192,9 @@ object Actor extends ListenerManagement { * val actor = actorOf(new MyActor).start * */ - def actorOf(factory: => Actor): ActorRef = new LocalActorRef(() => factory, None) + def actorOf(factory: => Actor, address: String): ActorRef = new LocalActorRef(() => factory, None, false, address) + + def actorOf(factory: => Actor): ActorRef = actorOf(factory, (new UUID).toString) /** * Creates an ActorRef out of the Actor. Allows you to pass in a factory (Creator) @@ -195,7 +204,17 @@ object Actor extends ListenerManagement { * This function should NOT be used for remote actors. * JAVA API */ - def actorOf(creator: Creator[Actor]): ActorRef = new LocalActorRef(() => creator.create, None) + def actorOf(creator: Creator[Actor], address: String): ActorRef = new LocalActorRef(() => creator.create, None, false, address) + + /** + * Creates an ActorRef out of the Actor. Allows you to pass in a factory (Creator) + * that creates the Actor. Please note that this function can be invoked multiple + * times if for example the Actor is supervised and needs to be restarted. + *

+ * This function should NOT be used for remote actors. + * JAVA API + */ + def actorOf(creator: Creator[Actor]): ActorRef = actorOf(creator, (new UUID).toString) /** * Use to spawn out a block of code in an event-driven actor. Will shut actor down when diff --git a/akka-actor/src/main/scala/akka/actor/ActorRef.scala b/akka-actor/src/main/scala/akka/actor/ActorRef.scala index cb28a07407..f17f0dfbe9 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRef.scala @@ -106,6 +106,13 @@ trait ActorRef extends ActorRefShared with java.lang.Comparable[ActorRef] { scal @volatile var id: String = _uuid.toString + /** + * FIXME Document + */ + @BeanProperty + @volatile + var address: String = _uuid.toString // FIXME set 'address' in ActorRef and make 'val' + /** * User overridable callback/setting. *

@@ -596,8 +603,10 @@ trait ActorRef extends ActorRefShared with java.lang.Comparable[ActorRef] { scal class LocalActorRef private[akka] ( private[this] val actorFactory: () => Actor, val homeAddress: Option[InetSocketAddress], - val clientManaged: Boolean = false) + val clientManaged: Boolean, + _address: String) extends ActorRef with ScalaActorRef { + this.address = _address @volatile private[akka] lazy val _linkedActors = new ConcurrentHashMap[Uuid, ActorRef] @@ -627,8 +636,9 @@ class LocalActorRef private[akka] ( __supervisor: Option[ActorRef], __hotswap: Stack[PartialFunction[Any, Unit]], __factory: () => Actor, - __homeAddress: Option[InetSocketAddress]) = { - this(__factory, __homeAddress) + __homeAddress: Option[InetSocketAddress], + __address: String) = { + this(__factory, __homeAddress, false, __address) _uuid = __uuid id = __id timeout = __timeout @@ -1207,6 +1217,8 @@ private[akka] case class RemoteActorRef private[akka] ( * //superclass, which ActorRefShared is. */ trait ActorRefShared { + def address: String + /** * Returns the uuid for the actor. */ @@ -1232,6 +1244,10 @@ trait ScalaActorRef extends ActorRefShared { ref: ActorRef => def id_=(id: String): Unit + def address: String + + def address_=(address: String): Unit + /** * User overridable callback/setting. *

diff --git a/akka-actor/src/main/scala/akka/actor/ActorRegistry.scala b/akka-actor/src/main/scala/akka/actor/ActorRegistry.scala index 03bef32d25..2bdfe08dcc 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRegistry.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRegistry.scala @@ -4,15 +4,15 @@ package akka.actor -import scala.collection.mutable.{ListBuffer, Map} +import scala.collection.mutable.ListBuffer import scala.reflect.Manifest +import annotation.tailrec import java.util.concurrent.{ConcurrentSkipListSet, ConcurrentHashMap} import java.util.{Set => JSet} -import annotation.tailrec import akka.util.ReflectiveAccess._ -import akka.util. {ReflectiveAccess, ReadWriteGuard, ListenerManagement} +import akka.util.{ReflectiveAccess, ReadWriteGuard, ListenerManagement} /** * Base trait for ActorRegistry events, allows listen to when an actor is added and removed from the ActorRegistry. @@ -20,43 +20,180 @@ import akka.util. {ReflectiveAccess, ReadWriteGuard, ListenerManagement} * @author Jonas Bonér */ sealed trait ActorRegistryEvent -case class ActorRegistered(actor: ActorRef) extends ActorRegistryEvent -case class ActorUnregistered(actor: ActorRef) extends ActorRegistryEvent +case class ActorRegistered(address: String, actor: ActorRef) extends ActorRegistryEvent +case class ActorUnregistered(address: String, actor: ActorRef) extends ActorRegistryEvent /** * Registry holding all Actor instances in the whole system. - * Mapped by: - *