diff --git a/akka-actor/src/main/scala/akka/actor/Actor.scala b/akka-actor/src/main/scala/akka/actor/Actor.scala index 0142e18523..3fa5ea9d91 100644 --- a/akka-actor/src/main/scala/akka/actor/Actor.scala +++ b/akka-actor/src/main/scala/akka/actor/Actor.scala @@ -6,14 +6,14 @@ package akka.actor import akka.dispatch._ import akka.config.Config._ -import akka.util.Helpers.{narrow, narrowSilently} -import akka.util.ListenerManagement +import akka.util.{ListenerManagement, ReflectiveAccess, Duration, Helpers} +import Helpers.{narrow, narrowSilently} +import akka.remoteinterface.RemoteSupport +import akka.japi.{Creator, Procedure} import akka.AkkaException import scala.reflect.BeanProperty -import akka.util. {ReflectiveAccess, Duration} -import akka.remoteinterface.RemoteSupport -import akka.japi. {Creator, Procedure} + import com.eaio.uuid.UUID /** @@ -21,7 +21,9 @@ import com.eaio.uuid.UUID */ sealed trait LifeCycleMessage extends Serializable -/* Marker trait to show which Messages are automatically handled by Akka */ +/** + * Marker trait to show which Messages are automatically handled by Akka + */ sealed trait AutoReceivedMessage { self: LifeCycleMessage => } case class HotSwap(code: ActorRef => Actor.Receive, discardOld: Boolean = true) @@ -199,12 +201,19 @@ object Actor extends ListenerManagement { Address.validate(address) Deployer.deploymentFor(address) match { - case Deploy(_, _, Local) => + case Deploy(_, router, Local) => + // FIXME handle 'router' in 'Local' actors newLocalActorRef(clazz, address) + case Deploy(_, router, Clustered(Home(hostname, port), Replicate(nrOfReplicas), state)) => RemoteActorRef( address, clazz.getName, Actor.TIMEOUT, None, ActorType.ScalaActor) + + case invalid => throw new IllegalActorStateException( + "Could not create actor [" + clazz.getName + + "] with address [" + address + + "], not bound to a valid deployment scheme [" + invalid + "]") } } diff --git a/akka-actor/src/main/scala/akka/actor/ActorRef.scala b/akka-actor/src/main/scala/akka/actor/ActorRef.scala index de870bdc8b..2c66d0c276 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRef.scala @@ -519,20 +519,6 @@ trait ActorRef extends ActorRefShared with java.lang.Comparable[ActorRef] { scal } override def toString = "Actor[" + address + ":" + uuid + "]" - - protected[akka] def checkReceiveTimeout = { - cancelReceiveTimeout - if (receiveTimeout.isDefined && dispatcher.mailboxSize(this) <= 0) { //Only reschedule if desired and there are currently no more messages to be processed - _futureTimeout = Some(Scheduler.scheduleOnce(this, ReceiveTimeout, receiveTimeout.get, TimeUnit.MILLISECONDS)) - } - } - - protected[akka] def cancelReceiveTimeout = { - if (_futureTimeout.isDefined) { - _futureTimeout.get.cancel(true) - _futureTimeout = None - } - } } /** @@ -978,7 +964,6 @@ class LocalActorRef private[akka] (private[this] val actorFactory: () => Actor, Actor.registry.register(this) } - protected[akka] def checkReceiveTimeout = { cancelReceiveTimeout if (receiveTimeout.isDefined && dispatcher.mailboxSize(this) <= 0) { //Only reschedule if desired and there are currently no more messages to be processed diff --git a/akka-actor/src/main/scala/akka/actor/Deployer.scala b/akka-actor/src/main/scala/akka/actor/Deployer.scala index 281ba09532..c31090b520 100644 --- a/akka-actor/src/main/scala/akka/actor/Deployer.scala +++ b/akka-actor/src/main/scala/akka/actor/Deployer.scala @@ -151,8 +151,10 @@ object Deployer { val address = deployment.address Address.validate(address) - if (deployments.putIfAbsent(address, deployment) != deployment) - throwDeploymentBoundException(deployment) + if (deployments.putIfAbsent(address, deployment) != deployment) { + // FIXME do automatic 'undeploy' and redeploy (perhaps have it configurable if redeploy should be done or exception thrown) + // throwDeploymentBoundException(deployment) + } deployLocally(deployment) } @@ -190,23 +192,6 @@ object Deployer { deployments.clear() } - def lookupDeploymentFor(address: String): Option[Deploy] = { - val deployment = deployments.get(address) - if (deployments ne null) Some(deployment) - else { - val deployment = - try { - lookupInConfig(address) - } catch { - case e: ConfigurationException => - EventHandler.error(e, this, e.getMessage) - throw e - } - deployment foreach (deploy(_)) - deployment - } - } - /** * Same as 'lookupDeploymentFor' but throws an exception if no deployment is bound. */ @@ -217,13 +202,33 @@ object Deployer { } } - def isLocal(address: String): Boolean = lookupDeploymentFor(address) match { - case Some(Deploy(_, _, Local)) => true - case _ => false + def lookupDeploymentFor(address: String): Option[Deploy] = { + val deployment = deployments.get(address) + if (deployment ne null) Some(deployment) + else { + val deployment = + try { + lookupInConfig(address) + } catch { + case e: ConfigurationException => + EventHandler.error(e, this, e.getMessage) + throw e + } + deployment foreach { d => + if (d eq null) { + val e = new IllegalStateException("Deployment for address [" + address + "] is null") + EventHandler.error(e, this, e.getMessage) + throw e + } + deploy(d) + } + deployment + } } - def isClustered(address: String): Boolean = !isLocal(address) - + /** + * Lookup deployment in 'akka.conf' configuration file. + */ def lookupInConfig(address: String): Option[Deploy] = { // -------------------------------- @@ -314,6 +319,13 @@ object Deployer { } } + def isLocal(address: String): Boolean = lookupDeploymentFor(address) match { + case Some(Deploy(_, _, Local)) => true + case _ => false + } + + def isClustered(address: String): Boolean = !isLocal(address) + private def throwDeploymentBoundException(deployment: Deploy): Nothing = { val e = new DeploymentBoundException( "Address [" + deployment.address + diff --git a/akka-actor/src/main/scala/akka/event/EventHandler.scala b/akka-actor/src/main/scala/akka/event/EventHandler.scala index 7e1abd580d..78314557d4 100644 --- a/akka-actor/src/main/scala/akka/event/EventHandler.scala +++ b/akka-actor/src/main/scala/akka/event/EventHandler.scala @@ -98,10 +98,14 @@ object EventHandler extends ListenerManagement { "Configuration option 'akka.event-handler-level' is invalid [" + unknown + "]") } + def start() { + info(this, "Starting up EventHandler") + } + /** * Shuts down all event handler listeners including the event handle dispatcher. */ - def shutdown() = { + def shutdown() { foreachListener(_.stop) EventHandlerDispatcher.shutdown } @@ -222,10 +226,13 @@ object EventHandler extends ListenerManagement { addListener(Actor.actorOf(clazz, listenerName).start) } } catch { + case e: akka.actor.DeploymentBoundException => // do nothing case e: Exception => throw new ConfigurationException( "Event Handler specified in config can't be loaded [" + listenerName + "] due to [" + e.toString + "]") } } + + start() } diff --git a/akka-samples/akka-sample-osgi/src/main/scala/OsgiExample.scala b/akka-samples/akka-sample-osgi/src/main/scala/OsgiExample.scala index 77134e6e3c..092fcf9d36 100644 --- a/akka-samples/akka-sample-osgi/src/main/scala/OsgiExample.scala +++ b/akka-samples/akka-sample-osgi/src/main/scala/OsgiExample.scala @@ -18,7 +18,7 @@ class Activator extends BundleActivator { } def stop(context: BundleContext) { - Actor.registry.shutdownAll() + Actor.registry.local.shutdownAll() println("Stopped the OSGi example.") } } diff --git a/akka-testkit/src/main/scala/akka/testkit/TestActorRef.scala b/akka-testkit/src/main/scala/akka/testkit/TestActorRef.scala index b8214f0fbf..158931d987 100644 --- a/akka-testkit/src/main/scala/akka/testkit/TestActorRef.scala +++ b/akka-testkit/src/main/scala/akka/testkit/TestActorRef.scala @@ -1,9 +1,15 @@ +/** + * Copyright (C) 2009-2011 Scalable Solutions AB + */ + package akka.testkit import akka.actor._ import akka.util.ReflectiveAccess import akka.event.EventHandler +import com.eaio.uuid.UUID + /** * This special ActorRef is exclusively for use during unit testing in a single-threaded environment. Therefore, it * overrides the dispatcher to CallingThreadDispatcher and sets the receiveTimeout to None. Otherwise, @@ -13,7 +19,7 @@ import akka.event.EventHandler * @author Roland Kuhn * @since 1.1 */ -class TestActorRef[T <: Actor](factory: () => T) extends LocalActorRef(factory) { +class TestActorRef[T <: Actor](factory: () => T, address: String) extends LocalActorRef(factory, address) { dispatcher = CallingThreadDispatcher.global receiveTimeout = None @@ -45,11 +51,11 @@ class TestActorRef[T <: Actor](factory: () => T) extends LocalActorRef(factory) this } - override def toString = "TestActor[" + id + ":" + uuid + "]" + override def toString = "TestActor[" + address + ":" + uuid + "]" override def equals(other : Any) = other.isInstanceOf[TestActorRef[_]] && - other.asInstanceOf[TestActorRef[_]].uuid == uuid + other.asInstanceOf[TestActorRef[_]].uuid == uuid /** * Override to check whether the new supervisor is running on the CallingThreadDispatcher, @@ -68,9 +74,14 @@ class TestActorRef[T <: Actor](factory: () => T) extends LocalActorRef(factory) object TestActorRef { - def apply[T <: Actor](factory: => T) = new TestActorRef(() => factory) + def apply[T <: Actor](factory: => T): TestActorRef[T] = apply[T](factory, new UUID().toString) + + def apply[T <: Actor](factory: => T, address: String): TestActorRef[T] = new TestActorRef(() => factory, address) + + def apply[T <: Actor : Manifest]: TestActorRef[T] = apply[T](new UUID().toString) + + def apply[T <: Actor : Manifest](address: String): TestActorRef[T] = new TestActorRef[T] ({ () => - def apply[T <: Actor : Manifest] : TestActorRef[T] = new TestActorRef[T] ({ () => import ReflectiveAccess.{ createInstance, noParams, noArgs } createInstance[T](manifest[T].erasure, noParams, noArgs).getOrElse( throw new ActorInitializationException( @@ -78,6 +89,6 @@ object TestActorRef { "\nMake sure Actor is NOT defined inside a class/trait," + "\nif so put it outside the class/trait, f.e. in a companion object," + "\nOR try to change: 'actorOf[MyActor]' to 'actorOf(new MyActor)'.")) - }) + }, address) } diff --git a/akka-testkit/src/test/scala/akka/testkit/TestActorRefSpec.scala b/akka-testkit/src/test/scala/akka/testkit/TestActorRefSpec.scala index 47c4908148..5a89cac4ab 100644 --- a/akka-testkit/src/test/scala/akka/testkit/TestActorRefSpec.scala +++ b/akka-testkit/src/test/scala/akka/testkit/TestActorRefSpec.scala @@ -91,6 +91,8 @@ class TestActorRefSpec extends WordSpec with MustMatchers with BeforeAndAfterEac import TestActorRefSpec._ + EventHandler.start() + override def beforeEach { otherthread = null } @@ -177,9 +179,7 @@ class TestActorRefSpec extends WordSpec with MustMatchers with BeforeAndAfterEac def receiveT = { case "sendKill" => ref ! Kill } }).start() - val l = stopLog() boss ! "sendKill" - startLog(l) counter must be (0) assertThread @@ -244,15 +244,4 @@ class TestActorRefSpec extends WordSpec with MustMatchers with BeforeAndAfterEac } } - - private def stopLog() = { - val l = Actor.registry.actorsFor[EventHandler.DefaultListener] - l foreach (EventHandler.removeListener(_)) - l - } - - private def startLog(l : Array[ActorRef]) { - l foreach {a => EventHandler.addListener(Actor.actorOf[EventHandler.DefaultListener])} - } - } diff --git a/akka-typed-actor/src/test/scala/actor/typed-actor/TypedActorRegistrySpec.scala b/akka-typed-actor/src/test/scala/actor/typed-actor/TypedActorRegistrySpec.scala index f1c995e393..e75ad5617a 100644 --- a/akka-typed-actor/src/test/scala/actor/typed-actor/TypedActorRegistrySpec.scala +++ b/akka-typed-actor/src/test/scala/actor/typed-actor/TypedActorRegistrySpec.scala @@ -15,64 +15,5 @@ class TypedActorRegistrySpec extends WordSpec with MustMatchers { import TypedActorRegistrySpec._ "Typed Actor" should { -<<<<<<< HEAD -======= - - "be able to be retrieved from the registry by class" in { - Actor.registry.shutdownAll() - val my = TypedActor.newInstance[My](classOf[My], classOf[MyImpl], 3000) - val actors = Actor.registry.typedActorsFor(classOf[My]) - actors.length must be (1) - Actor.registry.shutdownAll() - } - - "be able to be retrieved from the registry by manifest" in { - Actor.registry.shutdownAll() - val my = TypedActor.newInstance[My](classOf[My], classOf[MyImpl], 3000) - val option = Actor.registry.typedActorFor[My] - option must not be (null) - option.isDefined must be (true) - Actor.registry.shutdownAll() - } - - "be able to be retrieved from the registry by class two times" in { - Actor.registry.shutdownAll() - val my = TypedActor.newInstance[My](classOf[My], classOf[MyImpl], 3000) - val actors1 = Actor.registry.typedActorsFor(classOf[My]) - actors1.length must be (1) - val actors2 = Actor.registry.typedActorsFor(classOf[My]) - actors2.length must be (1) - Actor.registry.shutdownAll() - } - - "be able to be retrieved from the registry by manifest two times" in { - Actor.registry.shutdownAll() - val my = TypedActor.newInstance[My](classOf[My], classOf[MyImpl], 3000) - val option1 = Actor.registry.typedActorFor[My] - option1 must not be (null) - option1.isDefined must be (true) - val option2 = Actor.registry.typedActorFor[My] - option2 must not be (null) - option2.isDefined must be (true) - Actor.registry.shutdownAll() - } - - "be able to be retrieved from the registry by manifest two times (even when created in supervisor)" in { - Actor.registry.shutdownAll() - val manager = new TypedActorConfigurator - manager.configure( - OneForOneStrategy(classOf[Exception] :: Nil, 3, 1000), - Array(new SuperviseTypedActor(classOf[My], classOf[MyImpl], Permanent, 6000)) - ).supervise - - val option1 = Actor.registry.typedActorFor[My] - option1 must not be (null) - option1.isDefined must be (true) - val option2 = Actor.registry.typedActorFor[My] - option2 must not be (null) - option2.isDefined must be (true) - Actor.registry.shutdownAll() - } ->>>>>>> wip-rebase } } diff --git a/akka-typed-actor/src/test/scala/actor/typed-actor/TypedActorSpec.scala b/akka-typed-actor/src/test/scala/actor/typed-actor/TypedActorSpec.scala index 02198c82c5..aaecdf3748 100644 --- a/akka-typed-actor/src/test/scala/actor/typed-actor/TypedActorSpec.scala +++ b/akka-typed-actor/src/test/scala/actor/typed-actor/TypedActorSpec.scala @@ -65,11 +65,7 @@ class TypedActorSpec extends } override def afterEach() { -<<<<<<< HEAD Actor.registry.local.shutdownAll -======= - Actor.registry.shutdownAll() ->>>>>>> wip-rebase } describe("TypedActor") { @@ -126,26 +122,12 @@ class TypedActorSpec extends assert(Actor.registry.local.typedActorFor(uuid).get === simplePojo) } -<<<<<<< HEAD it("should support finding a typed actor by address ") { val typedActorRef = TypedActor.actorFor(simplePojo).get val address = typedActorRef.address assert(Actor.registry.local.typedActorFor(newUuid().toString) === None) assert(Actor.registry.local.typedActorFor(address).isDefined) assert(Actor.registry.local.typedActorFor(address).get === simplePojo) -======= - it("should support finding typed actors by id ") { - val typedActors = Actor.registry.typedActorsFor("my-custom-id") - assert(typedActors.length === 1) - assert(typedActors.contains(pojo)) - - // creating untyped actor with same custom id - val actorRef = Actor.actorOf[MyActor].start() - val typedActors2 = Actor.registry.typedActorsFor("my-custom-id") - assert(typedActors2.length === 1) - assert(typedActors2.contains(pojo)) - actorRef.stop() ->>>>>>> wip-rebase } it("should support to filter typed actors") { @@ -162,7 +144,6 @@ class TypedActorSpec extends } it("should support foreach for typed actors") { -<<<<<<< HEAD val actorRef = Actor.actorOf[MyActor].start assert(Actor.registry.local.actors.size === 3) assert(Actor.registry.local.typedActors.size === 2) @@ -178,23 +159,6 @@ class TypedActorSpec extends Actor.registry.local.shutdownAll() assert(Actor.registry.local.actors.size === 0) assert(Actor.registry.local.typedActors.size === 0) -======= - val actorRef = Actor.actorOf[MyActor].start() - assert(Actor.registry.actors.size === 3) - assert(Actor.registry.typedActors.size === 2) - Actor.registry.foreachTypedActor(TypedActor.stop(_)) - assert(Actor.registry.actors.size === 1) - assert(Actor.registry.typedActors.size === 0) - } - - it("should shutdown all typed and untyped actors") { - val actorRef = Actor.actorOf[MyActor].start() - assert(Actor.registry.actors.size === 3) - assert(Actor.registry.typedActors.size === 2) - Actor.registry.shutdownAll() - assert(Actor.registry.actors.size === 0) - assert(Actor.registry.typedActors.size === 0) ->>>>>>> wip-rebase } } } diff --git a/project/build/AkkaProject.scala b/project/build/AkkaProject.scala index 363a2228d1..b991571289 100644 --- a/project/build/AkkaProject.scala +++ b/project/build/AkkaProject.scala @@ -196,14 +196,15 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) { lazy val akka_actor_tests = project("akka-actor-tests", "akka-actor-tests", new AkkaActorTestsProject(_), akka_testkit) lazy val akka_stm = project("akka-stm", "akka-stm", new AkkaStmProject(_), akka_actor) lazy val akka_typed_actor = project("akka-typed-actor", "akka-typed-actor", new AkkaTypedActorProject(_), akka_stm, akka_actor_tests) - lazy val akka_remote = project("akka-remote", "akka-remote", new AkkaRemoteProject(_), akka_typed_actor) - lazy val akka_zookeeper = project("akka-zookeeper", "akka-zookeeper", new AkkaZookeeperProject(_), akka_remote) - lazy val akka_cluster = project("akka-cluster", "akka-cluster", new AkkaClusterProject(_), akka_zookeeper) + +// lazy val akka_remote = project("akka-remote", "akka-remote", new AkkaRemoteProject(_), akka_typed_actor) +// lazy val akka_zookeeper = project("akka-zookeeper", "akka-zookeeper", new AkkaZookeeperProject(_), akka_remote) +// lazy val akka_cluster = project("akka-cluster", "akka-cluster", new AkkaClusterProject(_), akka_zookeeper) lazy val akka_http = project("akka-http", "akka-http", new AkkaHttpProject(_), akka_actor) - lazy val akka_samples = project("akka-samples", "akka-samples", new AkkaSamplesParentProject(_)) lazy val akka_slf4j = project("akka-slf4j", "akka-slf4j", new AkkaSlf4jProject(_), akka_actor) lazy val akka_tutorials = project("akka-tutorials", "akka-tutorials", new AkkaTutorialsParentProject(_), akka_actor) + lazy val akka_samples = project("akka-samples", "akka-samples", new AkkaSamplesParentProject(_)) // ------------------------------------------------------------------------------------------------------------------- // Miscellaneous @@ -436,10 +437,10 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) { new AkkaSampleAntsProject(_), akka_stm) lazy val akka_sample_fsm = project("akka-sample-fsm", "akka-sample-fsm", new AkkaSampleFSMProject(_), akka_actor) - lazy val akka_sample_remote = project("akka-sample-remote", "akka-sample-remote", - new AkkaSampleRemoteProject(_), akka_remote) - lazy val akka_sample_chat = project("akka-sample-chat", "akka-sample-chat", - new AkkaSampleChatProject(_), akka_remote) +// lazy val akka_sample_remote = project("akka-sample-remote", "akka-sample-remote", +// new AkkaSampleRemoteProject(_), akka_remote) +// lazy val akka_sample_chat = project("akka-sample-chat", "akka-sample-chat", +// new AkkaSampleChatProject(_), akka_remote) lazy val akka_sample_osgi = project("akka-sample-osgi", "akka-sample-osgi", new AkkaSampleOsgiProject(_), akka_actor)