From 3e6decffc038cb9819253c409dc4f8c92b8232ec Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonas=20Bone=CC=81r?= Date: Fri, 7 Oct 2011 19:42:10 +0200 Subject: [PATCH] Removed the ActorRegistry, the different ActorRefProvider implementations now holds an Address->ActorRef registry. Looking up by UUID is gone together with all the other lookup methods such as 'foreach' etc. which do not make sense in a distributed env. 'shutdownAll' is also removed but will be replaced by parental supervision. --- .../src/test/java/akka/actor/JavaAPI.java | 5 - .../test/scala/akka/actor/ActorRefSpec.scala | 2 +- .../scala/akka/actor/ActorRegistrySpec.scala | 218 ++++++------- .../scala/akka/actor/DeathWatchSpec.scala | 2 +- .../test/scala/akka/actor/SchedulerSpec.scala | 27 +- .../test/scala/akka/actor/Ticket669Spec.scala | 2 +- .../akka/actor/dispatch/ActorModelSpec.scala | 69 ++-- .../akka/dispatch/MailboxConfigSpec.scala | 2 +- .../src/main/java/akka/actor/Actors.java | 13 +- .../src/main/scala/akka/actor/Actor.scala | 5 - .../src/main/scala/akka/actor/ActorCell.scala | 2 - .../src/main/scala/akka/actor/ActorRef.scala | 2 +- .../scala/akka/actor/ActorRefProvider.scala | 19 +- .../main/scala/akka/actor/ActorRegistry.scala | 299 ------------------ .../actor/BootableActorLoaderService.scala | 4 +- .../src/main/scala/akka/actor/Scheduler.scala | 12 +- .../main/scala/akka/actor/TypedActor.scala | 5 +- .../src/main/scala/akka/routing/Routing.scala | 8 +- akka-http/src/main/scala/akka/http/Mist.scala | 2 +- .../akka/remote/RemoteActorRefProvider.scala | 8 +- .../akka/remote/NetworkFailureSpec.scala | 2 +- .../main/scala/akka/testkit/TestFSMRef.scala | 1 - .../scala/akka/testkit/TestFSMRefSpec.scala | 6 +- 23 files changed, 208 insertions(+), 507 deletions(-) delete mode 100644 akka-actor/src/main/scala/akka/actor/ActorRegistry.scala diff --git a/akka-actor-tests/src/test/java/akka/actor/JavaAPI.java b/akka-actor-tests/src/test/java/akka/actor/JavaAPI.java index 5a4d415fb5..cdef83f7b8 100644 --- a/akka-actor-tests/src/test/java/akka/actor/JavaAPI.java +++ b/akka-actor-tests/src/test/java/akka/actor/JavaAPI.java @@ -13,11 +13,6 @@ public class JavaAPI { assertNotNull(remote); } - @Test void mustInteractWithActorRegistry() { - final ActorRegistry registry = Actors.registry(); - assertNotNull(registry); - } - @Test void mustBeAbleToCreateActorRefFromClass() { ActorRef ref = Actors.actorOf(JavaAPITestActor.class); assertNotNull(ref); diff --git a/akka-actor-tests/src/test/scala/akka/actor/ActorRefSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/ActorRefSpec.scala index 3800a482d4..65e16c036f 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/ActorRefSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/ActorRefSpec.scala @@ -262,7 +262,7 @@ class ActorRefSpec extends WordSpec with MustMatchers with TestKit { val latch = new CountDownLatch(1) val a = actorOf(new InnerActor { override def postStop { - Actor.registry.unregister(self) + // Actor.registry.unregister(self) latch.countDown } }) diff --git a/akka-actor-tests/src/test/scala/akka/actor/ActorRegistrySpec.scala b/akka-actor-tests/src/test/scala/akka/actor/ActorRegistrySpec.scala index 231edd37c0..99082d1699 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/ActorRegistrySpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/ActorRegistrySpec.scala @@ -1,128 +1,128 @@ -package akka.actor +// package akka.actor -import org.scalatest.WordSpec -import org.scalatest.matchers.MustMatchers -import org.scalatest.{ BeforeAndAfterAll, BeforeAndAfterEach } -import akka.testkit._ -import Actor._ -import java.util.concurrent.{ ConcurrentLinkedQueue, CyclicBarrier, TimeUnit, CountDownLatch } -import akka.dispatch.Future +// import org.scalatest.WordSpec +// import org.scalatest.matchers.MustMatchers +// import org.scalatest.{ BeforeAndAfterAll, BeforeAndAfterEach } +// import akka.testkit._ +// import Actor._ +// import java.util.concurrent.{ ConcurrentLinkedQueue, CyclicBarrier, TimeUnit, CountDownLatch } +// import akka.dispatch.Future -object ActorRegistrySpec { +// object ActorRegistrySpec { - class TestActor extends Actor { - def receive = { - case "ping" ⇒ reply("got ping") - } - } +// class TestActor extends Actor { +// def receive = { +// case "ping" ⇒ reply("got ping") +// } +// } - class StartStopTestActor(startedLatch: TestLatch, stoppedLatch: TestLatch) extends Actor { - override def preStart = { - startedLatch.countDown - } +// class StartStopTestActor(startedLatch: TestLatch, stoppedLatch: TestLatch) extends Actor { +// override def preStart = { +// startedLatch.countDown +// } - def receive = { - case "ping" ⇒ reply("got ping") - } +// def receive = { +// case "ping" ⇒ reply("got ping") +// } - override def postStop = { - stoppedLatch.countDown - } - } -} +// override def postStop = { +// stoppedLatch.countDown +// } +// } +// } -class ActorRegistrySpec extends WordSpec with MustMatchers with BeforeAndAfterAll with BeforeAndAfterEach { - import ActorRegistrySpec._ +// class ActorRegistrySpec extends WordSpec with MustMatchers with BeforeAndAfterAll with BeforeAndAfterEach { +// import ActorRegistrySpec._ - override def afterAll = { - Actor.registry.local.shutdownAll - akka.event.EventHandler.start() - } +// override def afterAll = { +// Actor.registry.local.shutdownAll +// akka.event.EventHandler.start() +// } - override def beforeEach = { - Actor.registry.local.shutdownAll - } +// override def beforeEach = { +// Actor.registry.local.shutdownAll +// } - "Actor Registry" must { +// "Actor Registry" must { - "get actor by address from registry" ignore { - val started = TestLatch(1) - val stopped = TestLatch(1) - val actor = actorOf(new StartStopTestActor(started, stopped), "test-actor-1") - started.await() - val registered = Actor.registry.actorFor(actor.address) - registered.isDefined must be(true) - registered.get.address must be(actor.address) - registered.get.address must be("test-actor-1") - registered.get.stop - stopped.await - Actor.registry.actorFor(actor.address).isEmpty must be(true) - } +// "get actor by address from registry" ignore { +// val started = TestLatch(1) +// val stopped = TestLatch(1) +// val actor = actorOf(new StartStopTestActor(started, stopped), "test-actor-1") +// started.await() +// val registered = Actor.registry.actorFor(actor.address) +// registered.isDefined must be(true) +// registered.get.address must be(actor.address) +// registered.get.address must be("test-actor-1") +// registered.get.stop +// stopped.await +// Actor.registry.actorFor(actor.address).isEmpty must be(true) +// } - "get actor by uuid from local registry" ignore { - val started = TestLatch(1) - val stopped = TestLatch(1) - val actor = actorOf(new StartStopTestActor(started, stopped), "test-actor-1") - started.await - val uuid = actor.uuid - val registered = Actor.registry.local.actorFor(uuid) - registered.isDefined must be(true) - registered.get.uuid must be(uuid) - registered.get.address must be("test-actor-1") - actor.stop - stopped.await - Actor.registry.local.actorFor(uuid).isEmpty must be(true) - } +// "get actor by uuid from local registry" ignore { +// val started = TestLatch(1) +// val stopped = TestLatch(1) +// val actor = actorOf(new StartStopTestActor(started, stopped), "test-actor-1") +// started.await +// val uuid = actor.uuid +// val registered = Actor.registry.local.actorFor(uuid) +// registered.isDefined must be(true) +// registered.get.uuid must be(uuid) +// registered.get.address must be("test-actor-1") +// actor.stop +// stopped.await +// Actor.registry.local.actorFor(uuid).isEmpty must be(true) +// } - "find things from local registry" ignore { - val actor = actorOf[TestActor]("test-actor-1") - val found: Option[LocalActorRef] = Actor.registry.local.find({ case a: LocalActorRef if a.underlyingActorInstance.isInstanceOf[TestActor] ⇒ a }) - found.isDefined must be(true) - found.get.underlyingActorInstance.isInstanceOf[TestActor] must be(true) - found.get.address must be("test-actor-1") - actor.stop - } +// "find things from local registry" ignore { +// val actor = actorOf[TestActor]("test-actor-1") +// val found: Option[LocalActorRef] = Actor.registry.local.find({ case a: LocalActorRef if a.underlyingActorInstance.isInstanceOf[TestActor] ⇒ a }) +// found.isDefined must be(true) +// found.get.underlyingActorInstance.isInstanceOf[TestActor] must be(true) +// found.get.address must be("test-actor-1") +// actor.stop +// } - "get all actors from local registry" ignore { - val actor1 = actorOf[TestActor]("test-actor-1") - val actor2 = actorOf[TestActor]("test-actor-2") - val actors = Actor.registry.local.actors - actors.size must be(2) - actors.find(_.address == "test-actor-2").get.asInstanceOf[LocalActorRef].underlyingActorInstance.isInstanceOf[TestActor] must be(true) - actors.find(_.address == "test-actor-1").get.asInstanceOf[LocalActorRef].underlyingActorInstance.isInstanceOf[TestActor] must be(true) - actor1.stop - actor2.stop - } +// "get all actors from local registry" ignore { +// val actor1 = actorOf[TestActor]("test-actor-1") +// val actor2 = actorOf[TestActor]("test-actor-2") +// val actors = Actor.registry.local.actors +// actors.size must be(2) +// actors.find(_.address == "test-actor-2").get.asInstanceOf[LocalActorRef].underlyingActorInstance.isInstanceOf[TestActor] must be(true) +// actors.find(_.address == "test-actor-1").get.asInstanceOf[LocalActorRef].underlyingActorInstance.isInstanceOf[TestActor] must be(true) +// actor1.stop +// actor2.stop +// } - "get response from all actors in local registry using foreach" ignore { - val actor1 = actorOf[TestActor]("test-actor-1") - val actor2 = actorOf[TestActor]("test-actor-2") - val results = new ConcurrentLinkedQueue[Future[String]] +// "get response from all actors in local registry using foreach" ignore { +// val actor1 = actorOf[TestActor]("test-actor-1") +// val actor2 = actorOf[TestActor]("test-actor-2") +// val results = new ConcurrentLinkedQueue[Future[String]] - Actor.registry.local.foreach(actor ⇒ results.add(actor.?("ping").mapTo[String])) +// Actor.registry.local.foreach(actor ⇒ results.add(actor.?("ping").mapTo[String])) - results.size must be(2) - val i = results.iterator - while (i.hasNext) assert(i.next.get === "got ping") - actor1.stop() - actor2.stop() - } +// results.size must be(2) +// val i = results.iterator +// while (i.hasNext) assert(i.next.get === "got ping") +// actor1.stop() +// actor2.stop() +// } - "shutdown all actors in local registry" ignore { - val actor1 = actorOf[TestActor]("test-actor-1") - val actor2 = actorOf[TestActor]("test-actor-2") - Actor.registry.local.shutdownAll - Actor.registry.local.actors.size must be(0) - } +// "shutdown all actors in local registry" ignore { +// val actor1 = actorOf[TestActor]("test-actor-1") +// val actor2 = actorOf[TestActor]("test-actor-2") +// Actor.registry.local.shutdownAll +// Actor.registry.local.actors.size must be(0) +// } - "remove when unregistering actors from local registry" ignore { - val actor1 = actorOf[TestActor]("test-actor-1") - val actor2 = actorOf[TestActor]("test-actor-2") - Actor.registry.local.actors.size must be(2) - Actor.registry.unregister(actor1) - Actor.registry.local.actors.size must be(1) - Actor.registry.unregister(actor2) - Actor.registry.local.actors.size must be(0) - } - } -} +// "remove when unregistering actors from local registry" ignore { +// val actor1 = actorOf[TestActor]("test-actor-1") +// val actor2 = actorOf[TestActor]("test-actor-2") +// Actor.registry.local.actors.size must be(2) +// Actor.registry.unregister(actor1) +// Actor.registry.local.actors.size must be(1) +// Actor.registry.unregister(actor2) +// Actor.registry.local.actors.size must be(0) +// } +// } +// } diff --git a/akka-actor-tests/src/test/scala/akka/actor/DeathWatchSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/DeathWatchSpec.scala index 0757a955ad..59453a110a 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/DeathWatchSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/DeathWatchSpec.scala @@ -20,7 +20,7 @@ class DeathWatchSpec extends WordSpec with MustMatchers with TestKit with Before import DeathWatchSpec._ "The Death Watch" must { - def expectTerminationOf(actorRef: ActorRef) = expectMsgPF(5 seconds, "stopped") { + def expectTerminationOf(actorRef: ActorRef) = expectMsgPF(2 seconds, "stopped") { case Terminated(`actorRef`, ex: ActorKilledException) if ex.getMessage == "Stopped" ⇒ true } diff --git a/akka-actor-tests/src/test/scala/akka/actor/SchedulerSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/SchedulerSpec.scala index 89bc8728db..12f0b2343a 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/SchedulerSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/SchedulerSpec.scala @@ -27,7 +27,7 @@ class SchedulerSpec extends JUnitSuite { @After def afterEach { while (futures.peek() ne null) { Option(futures.poll()).foreach(_.cancel(true)) } - Actor.registry.local.shutdownAll + // Actor.registry.local.shutdownAll EventHandler.start() } @@ -73,18 +73,19 @@ class SchedulerSpec extends JUnitSuite { /** * ticket #372 */ - @Test - def schedulerShouldntCreateActors = { - object Ping - val ticks = new CountDownLatch(1000) - val actor = actorOf(new Actor { - def receive = { case Ping ⇒ ticks.countDown } - }) - val numActors = Actor.registry.local.actors.length - (1 to 1000).foreach(_ ⇒ collectFuture(Scheduler.scheduleOnce(actor, Ping, 1, TimeUnit.MILLISECONDS))) - assert(ticks.await(10, TimeUnit.SECONDS)) - assert(Actor.registry.local.actors.length === numActors) - } + // FIXME rewrite the test so that registry is not used + // @Test + // def schedulerShouldntCreateActors = { + // object Ping + // val ticks = new CountDownLatch(1000) + // val actor = actorOf(new Actor { + // def receive = { case Ping ⇒ ticks.countDown } + // }) + // val numActors = Actor.registry.local.actors.length + // (1 to 1000).foreach(_ ⇒ collectFuture(Scheduler.scheduleOnce(actor, Ping, 1, TimeUnit.MILLISECONDS))) + // assert(ticks.await(10, TimeUnit.SECONDS)) + // assert(Actor.registry.local.actors.length === numActors) + // } /** * ticket #372 diff --git a/akka-actor-tests/src/test/scala/akka/actor/Ticket669Spec.scala b/akka-actor-tests/src/test/scala/akka/actor/Ticket669Spec.scala index 4f16c1ec03..a998c1b69e 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/Ticket669Spec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/Ticket669Spec.scala @@ -16,7 +16,7 @@ class Ticket669Spec extends WordSpec with MustMatchers with BeforeAndAfterAll wi override def beforeAll = Thread.interrupted() //remove interrupted status. override def afterAll = { - Actor.registry.local.shutdownAll + // Actor.registry.local.shutdownAll akka.event.EventHandler.start() } diff --git a/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala index 71d79c580b..99f581b1de 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala @@ -391,45 +391,46 @@ abstract class ActorModelSpec extends JUnitSuite { suspensions = 1, resumes = 1) } + // FIXME rewrite so we don't use the registr.foreach @Test def dispatcherShouldHandleWavesOfActors { - implicit val dispatcher = newInterceptedDispatcher + // implicit val dispatcher = newInterceptedDispatcher - def flood(num: Int) { - val cachedMessage = CountDownNStop(new CountDownLatch(num)) - (1 to num) foreach { _ ⇒ - newTestActor ! cachedMessage - } - try { - assertCountDown(cachedMessage.latch, Testing.testTime(10000), "Should process " + num + " countdowns") - } catch { - case e ⇒ - System.err.println("Error: " + e.getMessage + " missing count downs == " + cachedMessage.latch.getCount() + " out of " + num) - //EventHandler.error(new Exception with NoStackTrace, null, cachedMessage.latch.getCount()) - } - } - for (run ← 1 to 3) { - flood(40000) - try { - assertDispatcher(dispatcher)(starts = run, stops = run) - } catch { - case e ⇒ + // def flood(num: Int) { + // val cachedMessage = CountDownNStop(new CountDownLatch(num)) + // (1 to num) foreach { _ ⇒ + // newTestActor ! cachedMessage + // } + // try { + // assertCountDown(cachedMessage.latch, Testing.testTime(10000), "Should process " + num + " countdowns") + // } catch { + // case e ⇒ + // System.err.println("Error: " + e.getMessage + " missing count downs == " + cachedMessage.latch.getCount() + " out of " + num) + // //EventHandler.error(new Exception with NoStackTrace, null, cachedMessage.latch.getCount()) + // } + // } + // for (run ← 1 to 3) { + // flood(40000) + // try { + // assertDispatcher(dispatcher)(starts = run, stops = run) + // } catch { + // case e ⇒ - Actor.registry.local.foreach { - case actor: LocalActorRef ⇒ - val cell = actor.underlying - val mbox = cell.mailbox - System.err.println("Left in the registry: " + actor.address + " => " + cell + " => " + mbox.hasMessages + " " + mbox.hasSystemMessages + " " + mbox.numberOfMessages + " " + mbox.isScheduled) - var message = mbox.dequeue() - while (message ne null) { - System.err.println("Lingering message for " + cell + " " + message) - message = mbox.dequeue() - } - } + // Actor.registry.local.foreach { + // case actor: LocalActorRef ⇒ + // val cell = actor.underlying + // val mbox = cell.mailbox + // System.err.println("Left in the registry: " + actor.address + " => " + cell + " => " + mbox.hasMessages + " " + mbox.hasSystemMessages + " " + mbox.numberOfMessages + " " + mbox.isScheduled) + // var message = mbox.dequeue() + // while (message ne null) { + // System.err.println("Lingering message for " + cell + " " + message) + // message = mbox.dequeue() + // } + // } - throw e - } - } + // throw e + // } + // } } @Test diff --git a/akka-actor-tests/src/test/scala/akka/dispatch/MailboxConfigSpec.scala b/akka-actor-tests/src/test/scala/akka/dispatch/MailboxConfigSpec.scala index 130881b3a0..0ff2421da4 100644 --- a/akka-actor-tests/src/test/scala/akka/dispatch/MailboxConfigSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/dispatch/MailboxConfigSpec.scala @@ -9,7 +9,7 @@ import java.util.concurrent.{ TimeUnit, CountDownLatch, BlockingQueue } import java.util.{ Queue } import akka.util._ import akka.util.Duration._ -import akka.actor.{ LocalActorRef, Actor, ActorRegistry, NullChannel } +import akka.actor.{ LocalActorRef, Actor, NullChannel } @RunWith(classOf[JUnitRunner]) abstract class MailboxSpec extends WordSpec with MustMatchers with BeforeAndAfterAll with BeforeAndAfterEach { diff --git a/akka-actor/src/main/java/akka/actor/Actors.java b/akka-actor/src/main/java/akka/actor/Actors.java index 35b99b5d13..88e3cc86fa 100644 --- a/akka-actor/src/main/java/akka/actor/Actors.java +++ b/akka-actor/src/main/java/akka/actor/Actors.java @@ -16,12 +16,21 @@ import com.eaio.uuid.UUID; * - locating actors */ public class Actors { + + /** + * + * @return The actor provider + */ + public static ActorRefProviders provider() { + return Actor$.MODULE$.provider(); + } + /** * * @return The actor registry */ - public static ActorRegistry registry() { - return Actor$.MODULE$.registry(); + public static ActorRefProviders registry() { + return Actor$.MODULE$.provider(); } /** diff --git a/akka-actor/src/main/scala/akka/actor/Actor.scala b/akka-actor/src/main/scala/akka/actor/Actor.scala index 9d835ed3d2..51a9acb982 100644 --- a/akka-actor/src/main/scala/akka/actor/Actor.scala +++ b/akka-actor/src/main/scala/akka/actor/Actor.scala @@ -170,11 +170,6 @@ object Actor { */ val provider = new ActorRefProviders - /** - * Handle to the ActorRegistry. - */ - val registry = new ActorRegistry - /** * Handle to the ClusterNode. API for the cluster client. */ diff --git a/akka-actor/src/main/scala/akka/actor/ActorCell.scala b/akka-actor/src/main/scala/akka/actor/ActorCell.scala index 8ac65b22df..cdf43c8560 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorCell.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorCell.scala @@ -245,7 +245,6 @@ private[akka] class ActorCell( } } - Actor.registry.register(self) dispatcher.attach(this) } @@ -379,7 +378,6 @@ private[akka] class ActorCell( receiveTimeout = None cancelReceiveTimeout Actor.provider.evict(self.address) - Actor.registry.unregister(self) dispatcher.detach(this) try { diff --git a/akka-actor/src/main/scala/akka/actor/ActorRef.scala b/akka-actor/src/main/scala/akka/actor/ActorRef.scala index 4dad678d02..d97e3551d0 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRef.scala @@ -418,7 +418,7 @@ case class SerializedActorRef(uuid: Uuid, port: Int, timeout: Long) { @throws(classOf[java.io.ObjectStreamException]) - def readResolve(): AnyRef = Actor.registry.local.actorFor(uuid) match { + def readResolve(): AnyRef = Actor.provider.actorFor(address) match { case Some(actor) ⇒ actor case None ⇒ //TODO FIXME Add case for when hostname+port == remote.address.hostname+port, should return a DeadActorRef or something diff --git a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala index 47fd36ccf7..c6ae1b11fc 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala @@ -15,7 +15,7 @@ trait ActorRefProvider { def actorOf(props: Props, address: String): Option[ActorRef] - def findActorRef(address: String): Option[ActorRef] + def actorFor(address: String): Option[ActorRef] private[akka] def evict(address: String): Boolean } @@ -73,21 +73,21 @@ private[akka] class ActorRefProviders( providersAsList.map(_.getClass.getName).mkString(", ") + "]")) } - def findActorRef(address: String): Option[ActorRef] = { + def actorFor(address: String): Option[ActorRef] = { @annotation.tailrec - def findActorRef(address: String, providers: List[ActorRefProvider]): Option[ActorRef] = { + def actorFor(address: String, providers: List[ActorRefProvider]): Option[ActorRef] = { providers match { case Nil ⇒ None case provider :: rest ⇒ - provider.findActorRef(address) match { - case None ⇒ findActorRef(address, rest) // recur + provider.actorFor(address) match { + case None ⇒ actorFor(address, rest) // recur case ref ⇒ ref } } } - findActorRef(address, providersAsList) + actorFor(address, providersAsList) } /** @@ -130,7 +130,10 @@ class LocalActorRefProvider extends ActorRefProvider { def actorOf(props: Props, address: String): Option[ActorRef] = actorOf(props, address, false) - def findActorRef(address: String): Option[ActorRef] = Actor.registry.local.actorFor(address) + def actorFor(address: String): Option[ActorRef] = actors.get(address) match { + case null ⇒ None + case future ⇒ future.await.resultOrException.getOrElse(None) + } /** * Returns true if the actor was in the provider's cache and evicted successfully, else false. @@ -182,8 +185,6 @@ class LocalActorRefProvider extends ActorRefProvider { throw e } - actor foreach Actor.registry.register // only for ActorRegistry backward compat, will be removed later - newFuture completeWithResult actor actor diff --git a/akka-actor/src/main/scala/akka/actor/ActorRegistry.scala b/akka-actor/src/main/scala/akka/actor/ActorRegistry.scala deleted file mode 100644 index a04038b40c..0000000000 --- a/akka-actor/src/main/scala/akka/actor/ActorRegistry.scala +++ /dev/null @@ -1,299 +0,0 @@ -/** - * Copyright (C) 2009-2011 Typesafe Inc. - */ - -package akka.actor - -import scala.collection.mutable.ListBuffer - -import java.util.concurrent.ConcurrentHashMap - -import akka.util.ListenerManagement -import reflect.BeanProperty - -/** - * Base trait for ActorRegistry events, allows listen to when an actor is added and removed from the ActorRegistry. - * - * @author Jonas Bonér - */ -sealed trait ActorRegistryEvent -case class ActorRegistered(@BeanProperty address: String, @BeanProperty actor: ActorRef) extends ActorRegistryEvent -case class ActorUnregistered(@BeanProperty address: String, @BeanProperty actor: ActorRef) extends ActorRegistryEvent -case class TypedActorRegistered(@BeanProperty address: String, @BeanProperty actor: ActorRef, @BeanProperty proxy: AnyRef) extends ActorRegistryEvent -case class TypedActorUnregistered(@BeanProperty address: String, @BeanProperty actor: ActorRef, @BeanProperty proxy: AnyRef) extends ActorRegistryEvent - -/** - * Registry holding all Actor instances in the whole system. - * Mapped by address which is a unique string. - * - * @author Jonas Bonér - */ -private[actor] final class ActorRegistry private[actor] () extends ListenerManagement { - private val actorsByAddress = new ConcurrentHashMap[String, ActorRef] - private val actorsByUuid = new ConcurrentHashMap[Uuid, ActorRef] - private val typedActorsByUuid = new ConcurrentHashMap[Uuid, AnyRef] - - val local = new LocalActorRegistry(actorsByAddress, actorsByUuid, typedActorsByUuid) - - /** - * Finds the actor that has a specific address. - */ - def actorFor(address: String): Option[ActorRef] = Option(actorsByAddress.get(address)) - - /** - * Finds the typed actors that have a specific address. - */ - def typedActorFor(address: String): Option[AnyRef] = - actorFor(address) map (typedActorFor(_)) - - /** - * Registers an actor in the ActorRegistry. - */ - private[akka] def register(actor: ActorRef) { - val address = actor.address - - // FIXME: this check is nice but makes serialization/deserialization specs break - //if (actorsByAddress.containsKey(address) || registeredInCluster(address)) - // throw new IllegalStateException("Actor 'address' [" + address + "] is already in use, can't register actor [" + actor + "]") - actorsByAddress.put(address, actor) - actorsByUuid.put(actor.uuid, actor) - notifyListeners(ActorRegistered(address, actor)) - } - - private[akka] def registerTypedActor(actorRef: ActorRef, proxy: AnyRef) { - if (typedActorsByUuid.putIfAbsent(actorRef.uuid, proxy) eq null) - notifyListeners(TypedActorRegistered(actorRef.address, actorRef, proxy)) - } - - private[akka] def unregisterTypedActor(actorRef: ActorRef, proxy: AnyRef) { - if (typedActorsByUuid.remove(actorRef.uuid, proxy)) - notifyListeners(TypedActorUnregistered(actorRef.address, actorRef, proxy)) - } - - /** - * Unregisters an actor in the ActorRegistry. - */ - private[akka] def unregister(address: String) { - val actor = actorsByAddress remove address - actorsByUuid remove actor.uuid - notifyListeners(ActorUnregistered(address, actor)) - } - - /** - * Unregisters an actor in the ActorRegistry. - */ - private[akka] def unregister(actor: ActorRef) { - val address = actor.address - actorsByAddress remove address - actorsByUuid remove actor.uuid - notifyListeners(ActorUnregistered(address, actor)) - - //Safe cleanup (if called from the outside) - val proxy = typedActorsByUuid.remove(actor.uuid) - if (proxy ne null) - notifyListeners(TypedActorUnregistered(address, actor, proxy)) - } - - /** - * Registers an actor in the Cluster ActorRegistry. - */ - // private[akka] def registerInCluster[T <: Actor]( - // address: String, actorRef: ActorRef, replicas: Int, serializeMailbox: Boolean = false)(implicit format: Serializer) { - // // FIXME: implement ActorRegistry.registerInCluster(..) - // } - - /** - * Unregisters an actor in the Cluster ActorRegistry. - */ - // private[akka] def unregisterInCluster(address: String) { - // ClusterModule.node.remove(address) - // } - - /** - * Get the typed actor proxy for a given typed actor ref. - */ - private def typedActorFor(actorRef: ActorRef): Option[AnyRef] = - Option(typedActorsByUuid.get(actorRef.uuid)) -} - -/** - * Projection over the local actor registry. - */ -class LocalActorRegistry( - private val actorsByAddress: ConcurrentHashMap[String, ActorRef], - private val actorsByUuid: ConcurrentHashMap[Uuid, ActorRef], - private val typedActorsByUuid: ConcurrentHashMap[Uuid, AnyRef]) { - - // NOTE: currently ClusterActorRef's are only taken into account in 'actorFor(..)' - not in 'find', 'filter' etc. - private val clusterActorRefsByAddress = new ConcurrentHashMap[String, ActorRef] - private val clusterActorRefsByUuid = new ConcurrentHashMap[Uuid, ActorRef] - - /** - * Returns the number of actors in the system. - */ - def size: Int = actorsByAddress.size - - /** - * Shuts down and unregisters all actors in the system. - */ - def shutdownAll() { - foreach(_.stop) - actorsByAddress.clear() - actorsByUuid.clear() - typedActorsByUuid.clear() - } - - //============== ACTORS ============== - - /** - * Finds the actor that have a specific address. - * - * If a ClusterActorRef exists in the registry, then return that before we look after a LocalActorRef. - */ - def actorFor(address: String): Option[ActorRef] = { - if (clusterActorRefsByAddress.containsKey(address)) Some(clusterActorRefsByAddress.get(address)) - else if (actorsByAddress.containsKey(address)) Some(actorsByAddress.get(address)) - else None - } - - private[akka] def actorFor(uuid: Uuid): Option[ActorRef] = - if (clusterActorRefsByUuid.containsKey(uuid)) Some(clusterActorRefsByUuid.get(uuid)) - else if (actorsByUuid.containsKey(uuid)) Some(actorsByUuid.get(uuid)) - else None - - // By-passes checking the registry for ClusterActorRef and only returns possible LocalActorRefs - private[akka] def localActorRefFor(address: String): Option[ActorRef] = { - if (actorsByAddress.containsKey(address)) Some(actorsByAddress.get(address)) - else None - } - - // By-passes checking the registry for ClusterActorRef and only returns possible LocalActorRefs - private[akka] def localActorRefFor(uuid: Uuid): Option[ActorRef] = - if (actorsByUuid.containsKey(uuid)) Some(actorsByUuid.get(uuid)) - else None - - /** - * Finds the typed actor that have a specific address. - */ - def typedActorFor(address: String): Option[AnyRef] = - actorFor(address) map (typedActorFor(_)) getOrElse None - - /** - * Finds the typed actor that have a specific uuid. - */ - private[akka] def typedActorFor(uuid: Uuid): Option[AnyRef] = - Option(typedActorsByUuid.get(uuid)) - - /** - * Returns all actors in the system. - */ - def actors: Array[ActorRef] = filter(_ ⇒ true) - - /** - * Invokes a function for all actors. - */ - def foreach(f: (ActorRef) ⇒ Unit) = { - val elements = actorsByAddress.elements - while (elements.hasMoreElements) f(elements.nextElement) - } - - /** - * Invokes the function on all known actors until it returns Some - * Returns None if the function never returns Some - */ - def find[T](f: PartialFunction[ActorRef, T]): Option[T] = { - val elements = actorsByAddress.elements - while (elements.hasMoreElements) { - val element = elements.nextElement - if (f isDefinedAt element) return Some(f(element)) - } - None - } - - /** - * Finds all actors that satisfy a predicate. - */ - def filter(p: ActorRef ⇒ Boolean): Array[ActorRef] = { - val all = new ListBuffer[ActorRef] - val elements = actorsByAddress.elements - while (elements.hasMoreElements) { - val actorId = elements.nextElement - if (p(actorId)) all += actorId - } - all.toArray - } - - //============== TYPED ACTORS ============== - - /** - * Returns all typed actors in the system. - */ - def typedActors: Array[AnyRef] = filterTypedActors(_ ⇒ true) - - /** - * Invokes a function for all typed actors. - */ - def foreachTypedActor(f: (AnyRef) ⇒ Unit) = { - val i = typedActorsByUuid.values.iterator - while (i.hasNext) - f(i.next) - } - - /** - * Invokes the function on all known typed actors until it returns Some - * Returns None if the function never returns Some - */ - def findTypedActor[T](f: PartialFunction[AnyRef, T]): Option[T] = { - val i = typedActorsByUuid.values.iterator - while (i.hasNext) { - val proxy = i.next - if (f isDefinedAt proxy) return Some(f(proxy)) - } - - None - } - - /** - * Finds all typed actors that satisfy a predicate. - */ - def filterTypedActors(p: AnyRef ⇒ Boolean): Array[AnyRef] = { - val all = new ListBuffer[AnyRef] - val i = typedActorsByUuid.values.iterator - while (i.hasNext) { - val proxy = i.next - if (p(proxy)) all += proxy - } - - all.toArray - } - - /** - * Get the typed actor proxy for a given typed actor ref. - */ - private def typedActorFor(actorRef: ActorRef): Option[AnyRef] = - typedActorFor(actorRef.uuid) - - /** - * Registers an ClusterActorRef in the ActorRegistry. - */ - private[akka] def registerClusterActorRef(actor: ActorRef) { - val address = actor.address - clusterActorRefsByAddress.put(address, actor) - clusterActorRefsByUuid.put(actor.uuid, actor) - } - - /** - * Unregisters an ClusterActorRef in the ActorRegistry. - */ - private[akka] def unregisterClusterActorRef(address: String) { - val actor = clusterActorRefsByAddress remove address - clusterActorRefsByUuid remove actor.uuid - } - - /** - * Unregisters an ClusterActorRef in the ActorRegistry. - */ - private[akka] def unregisterClusterActorRef(actor: ActorRef) { - unregisterClusterActorRef(actor.address) - } -} diff --git a/akka-actor/src/main/scala/akka/actor/BootableActorLoaderService.scala b/akka-actor/src/main/scala/akka/actor/BootableActorLoaderService.scala index 599e677e7b..5fac4f29c6 100644 --- a/akka-actor/src/main/scala/akka/actor/BootableActorLoaderService.scala +++ b/akka-actor/src/main/scala/akka/actor/BootableActorLoaderService.scala @@ -57,7 +57,9 @@ trait BootableActorLoaderService extends Bootable { abstract override def onUnload = { super.onUnload - Actor.registry.local.shutdownAll + + // FIXME shutdown all actors + //Actor.registry.local.shutdownAll } } diff --git a/akka-actor/src/main/scala/akka/actor/Scheduler.scala b/akka-actor/src/main/scala/akka/actor/Scheduler.scala index 9c93e2dbdd..0dd9a30bfa 100644 --- a/akka-actor/src/main/scala/akka/actor/Scheduler.scala +++ b/akka-actor/src/main/scala/akka/actor/Scheduler.scala @@ -28,17 +28,7 @@ object Scheduler { private[akka] val service = Executors.newSingleThreadScheduledExecutor(SchedulerThreadFactory) private def createSendRunnable(receiver: ActorRef, message: Any, throwWhenReceiverExpired: Boolean): Runnable = { - receiver match { - case local: LocalActorRef ⇒ - val uuid = local.uuid - new Runnable { - def run = Actor.registry.local.actorFor(uuid) match { - case None ⇒ if (throwWhenReceiverExpired) throw new RuntimeException("Receiver not found, unregistered") - case Some(actor) ⇒ actor ! message - } - } - case other ⇒ new Runnable { def run = other ! message } - } + new Runnable { def run = receiver ! message } } /** diff --git a/akka-actor/src/main/scala/akka/actor/TypedActor.scala b/akka-actor/src/main/scala/akka/actor/TypedActor.scala index c174dffce3..c3a54a8f83 100644 --- a/akka-actor/src/main/scala/akka/actor/TypedActor.scala +++ b/akka-actor/src/main/scala/akka/actor/TypedActor.scala @@ -232,8 +232,9 @@ object TypedActor { private[akka] class TypedActor[R <: AnyRef, T <: R](val proxyVar: AtomVar[R], createInstance: ⇒ T) extends Actor { - override def preStart = Actor.registry.registerTypedActor(self, proxyVar.get) //Make sure actor registry knows about this actor - override def postStop = Actor.registry.unregisterTypedActor(self, proxyVar.get) + // FIXME TypedActor register/unregister on postStop/preStart + // override def preStart = Actor.registry.registerTypedActor(self, proxyVar.get) //Make sure actor registry knows about this actor + // override def postStop = Actor.registry.unregisterTypedActor(self, proxyVar.get) val me = createInstance def receive = { diff --git a/akka-actor/src/main/scala/akka/routing/Routing.scala b/akka-actor/src/main/scala/akka/routing/Routing.scala index 2fbb92631c..4e6ece4215 100644 --- a/akka-actor/src/main/scala/akka/routing/Routing.scala +++ b/akka-actor/src/main/scala/akka/routing/Routing.scala @@ -66,10 +66,16 @@ object Routing { sealed trait RoutingMessage + /** + * Used to broadcast a message to all connections in a router. E.g. every connection gets the message + * regardless of their routing algorithm. + */ case class Broadcast(message: Any) extends RoutingMessage /** - * FIXME: will very likely be moved to the ActorRef. + * Creates (or fetches) a routed actor reference, configured by the 'props: RoutedProps' configuration. + * + * FIXME: will very likely be moved to the ActorRefProvider. */ def actorOf(props: RoutedProps, address: String = newUuid().toString): ActorRef = { //TODO Implement support for configuring by deployment ID etc diff --git a/akka-http/src/main/scala/akka/http/Mist.scala b/akka-http/src/main/scala/akka/http/Mist.scala index 8c181ea47b..ff05bcc826 100644 --- a/akka-http/src/main/scala/akka/http/Mist.scala +++ b/akka-http/src/main/scala/akka/http/Mist.scala @@ -139,7 +139,7 @@ trait RootEndpointLocator { def configureRoot(address: String) { def findRoot(address: String): ActorRef = - Actor.registry.actorFor(address).getOrElse( + Actor.provider.actorFor(address).getOrElse( throw new ConfigurationException("akka.http.root-actor-id configuration option does not have a valid actor address [" + address + "]")) root = if ((address eq null) || address == "") findRoot(MistSettings.RootActorID) else findRoot(address) diff --git a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala index f5cb3ba18b..12f1cdba39 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala @@ -32,6 +32,7 @@ class RemoteActorRefProvider extends ActorRefProvider { import akka.dispatch.Promise private val actors = new ConcurrentHashMap[String, Promise[Option[ActorRef]]] + private val remoteDaemonConnectionManager = new RemoteConnectionManager(failureDetector = new BannagePeriodFailureDetector(60 seconds)) // FIXME make timeout configurable def actorOf(props: Props, address: String): Option[ActorRef] = { @@ -119,7 +120,7 @@ class RemoteActorRefProvider extends ActorRefProvider { throw e } - actor foreach Actor.registry.register // only for ActorRegistry backward compat, will be removed later + // actor foreach Actor.registry.register // only for ActorRegistry backward compat, will be removed later newFuture completeWithResult actor actor @@ -129,7 +130,10 @@ class RemoteActorRefProvider extends ActorRefProvider { } } - def findActorRef(address: String): Option[ActorRef] = throw new UnsupportedOperationException + def actorFor(address: String): Option[ActorRef] = actors.get(address) match { + case null ⇒ None + case future ⇒ future.await.resultOrException.getOrElse(None) + } /** * Returns true if the actor was in the provider's cache and evicted successfully, else false. diff --git a/akka-remote/src/test/scala/akka/remote/NetworkFailureSpec.scala b/akka-remote/src/test/scala/akka/remote/NetworkFailureSpec.scala index d81e830116..fa1b340490 100644 --- a/akka-remote/src/test/scala/akka/remote/NetworkFailureSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/NetworkFailureSpec.scala @@ -11,7 +11,7 @@ import org.scalatest.junit.JUnitRunner import org.junit.runner.RunWith import akka.remote.netty.NettyRemoteSupport -import akka.actor.{ Actor, ActorRegistry } +import akka.actor.Actor import java.util.concurrent.{ TimeUnit, CountDownLatch } import java.util.concurrent.atomic.AtomicBoolean diff --git a/akka-testkit/src/main/scala/akka/testkit/TestFSMRef.scala b/akka-testkit/src/main/scala/akka/testkit/TestFSMRef.scala index 783456e223..8d7ebebb10 100644 --- a/akka-testkit/src/main/scala/akka/testkit/TestFSMRef.scala +++ b/akka-testkit/src/main/scala/akka/testkit/TestFSMRef.scala @@ -82,5 +82,4 @@ object TestFSMRef { def apply[S, D, T <: Actor](factory: ⇒ T)(implicit ev: T <:< FSM[S, D]): TestFSMRef[S, D, T] = new TestFSMRef(Props(creator = () ⇒ factory), new UUID().toString) def apply[S, D, T <: Actor](factory: ⇒ T, address: String)(implicit ev: T <:< FSM[S, D]): TestFSMRef[S, D, T] = new TestFSMRef(Props(creator = () ⇒ factory), address) - } diff --git a/akka-testkit/src/test/scala/akka/testkit/TestFSMRefSpec.scala b/akka-testkit/src/test/scala/akka/testkit/TestFSMRefSpec.scala index 016dbc35e5..b2bff0f99d 100644 --- a/akka-testkit/src/test/scala/akka/testkit/TestFSMRefSpec.scala +++ b/akka-testkit/src/test/scala/akka/testkit/TestFSMRefSpec.scala @@ -25,7 +25,7 @@ class TestFSMRefSpec extends WordSpec with MustMatchers with TestKit { when(2) { case Ev("back") ⇒ goto(1) using "back" } - }) + }, "test-fsm-ref-1") fsm.stateName must be(1) fsm.stateData must be("") fsm ! "go" @@ -49,14 +49,12 @@ class TestFSMRefSpec extends WordSpec with MustMatchers with TestKit { when(1) { case x ⇒ stay } - }) + }, "test-fsm-ref-2") fsm.timerActive_?("test") must be(false) fsm.setTimer("test", 12, 10 millis, true) fsm.timerActive_?("test") must be(true) fsm.cancelTimer("test") fsm.timerActive_?("test") must be(false) } - } - }