diff --git a/akka-camel/src/main/scala/Producer.scala b/akka-camel/src/main/scala/Producer.scala index 6c328c3e36..09b2bc98da 100644 --- a/akka-camel/src/main/scala/Producer.scala +++ b/akka-camel/src/main/scala/Producer.scala @@ -180,9 +180,9 @@ class ProducerResponseSender( def onComplete(exchange: Exchange) = reply(exchange.toResponseMessage(headers)) private def reply(message: Any) = replyTo match { - case Some(Left(actor)) => actor ! message + case Some(Left(actor)) => actor ! message case Some(Right(future)) => future.completeWithResult(message) - case _ => log.warning("No destination for sending response") + case _ => log.warning("No destination for sending response") } } diff --git a/akka-camel/src/main/scala/service/ConsumerPublisher.scala b/akka-camel/src/main/scala/service/ConsumerPublisher.scala index 2592ba4dce..8667791dd9 100644 --- a/akka-camel/src/main/scala/service/ConsumerPublisher.scala +++ b/akka-camel/src/main/scala/service/ConsumerPublisher.scala @@ -85,8 +85,8 @@ class PublishRequestor(consumerPublisher: ActorID) extends Actor { protected def receive = { case ActorUnregistered(actor) => { /* ignore */ } case ActorRegistered(actor) => Publish.forConsumer(actor) match { - case Some(publish) => consumerPublisher ! publish - case None => { /* ignore */ } + case Some(publish) => consumerPublisher ! publish + case None => { /* ignore */ } } } } @@ -121,15 +121,15 @@ object Publish { def forConsumer(actor: ActorID): Option[Publish] = forConsumeAnnotated(actor) orElse forConsumerType(actor) - private def forConsumeAnnotated(actor: ActorID): Option[Publish] = { - val annotation = actor.getClass.getAnnotation(classOf[consume]) + private def forConsumeAnnotated(actorId: ActorID): Option[Publish] = { + val annotation = actorId.actorInstanceClass.getAnnotation(classOf[consume]) if (annotation eq null) None - else if (actor.remoteAddress.isDefined) None // do not publish proxies - else Some(Publish(annotation.value, actor.getId, false)) + else if (actorId.remoteAddress.isDefined) None // do not publish proxies + else Some(Publish(annotation.value, actorId.id, false)) } - private def forConsumerType(actor: ActorID): Option[Publish] = - if (!actor.isInstanceOf[Consumer]) None - else if (actor.remoteAddress.isDefined) None - else Some(Publish(actor.asInstanceOf[Consumer].endpointUri, actor.uuid, true)) + private def forConsumerType(actorId: ActorID): Option[Publish] = + if (!actorId.actor.isInstanceOf[Consumer]) None + else if (actorId.remoteAddress.isDefined) None + else Some(Publish(actorId.actor.asInstanceOf[Consumer].endpointUri, actorId.uuid, true)) } diff --git a/akka-camel/src/test/scala/ProducerFeatureTest.scala b/akka-camel/src/test/scala/ProducerFeatureTest.scala index e255dc861f..fdd14edda3 100644 --- a/akka-camel/src/test/scala/ProducerFeatureTest.scala +++ b/akka-camel/src/test/scala/ProducerFeatureTest.scala @@ -31,7 +31,7 @@ class ProducerFeatureTest extends FeatureSpec with BeforeAndAfterAll with Before mockEndpoint.reset ActorRegistry.shutdownAll } -/* + feature("Produce a message to a Camel endpoint") { scenario("produce message sync and receive response") { @@ -121,7 +121,6 @@ class ProducerFeatureTest extends FeatureSpec with BeforeAndAfterAll with Before } } -*/ private def mockEndpoint = CamelContextManager.context.getEndpoint("mock:mock", classOf[MockEndpoint]) class TestRoute extends RouteBuilder { diff --git a/akka-camel/src/test/scala/service/CamelServiceFeatureTest.scala b/akka-camel/src/test/scala/service/CamelServiceFeatureTest.scala index c1f00c408f..b876fe14c8 100644 --- a/akka-camel/src/test/scala/service/CamelServiceFeatureTest.scala +++ b/akka-camel/src/test/scala/service/CamelServiceFeatureTest.scala @@ -35,7 +35,6 @@ class CamelServiceFeatureTest extends FeatureSpec with BeforeAndAfterAll with Gi var service: CamelService = CamelService.newInstance - /* override protected def beforeAll = { ActorRegistry.shutdownAll // register test consumer before starting the CamelService @@ -89,5 +88,4 @@ class CamelServiceFeatureTest extends FeatureSpec with BeforeAndAfterAll with Gi assert(response === "received msg3") } } - */ } \ No newline at end of file diff --git a/akka-camel/src/test/scala/service/PublishRequestorTest.scala b/akka-camel/src/test/scala/service/PublishRequestorTest.scala index e462847493..59e9696ee4 100644 --- a/akka-camel/src/test/scala/service/PublishRequestorTest.scala +++ b/akka-camel/src/test/scala/service/PublishRequestorTest.scala @@ -17,7 +17,7 @@ object PublishRequestorTest { def onMessage(msg: Publish) = received = msg } } -/* + class PublishRequestorTest extends JUnitSuite { import PublishRequestorTest._ @@ -27,7 +27,7 @@ class PublishRequestorTest extends JUnitSuite { val consumer = newActor(() => new Actor with Consumer { def endpointUri = "mock:test" protected def receive = null - }) + }).start val publisher = newActor(() => new PublisherMock with Countdown[Publish]) val requestor = newActor(() => new PublishRequestor(publisher)) publisher.start @@ -39,4 +39,3 @@ class PublishRequestorTest extends JUnitSuite { requestor.stop } } -*/ \ No newline at end of file diff --git a/akka-camel/src/test/scala/service/PublishTest.scala b/akka-camel/src/test/scala/service/PublishTest.scala index 7ddb50bd85..69910254ec 100644 --- a/akka-camel/src/test/scala/service/PublishTest.scala +++ b/akka-camel/src/test/scala/service/PublishTest.scala @@ -24,7 +24,7 @@ object PublishTest { protected def receive = null } } -/* + class PublishTest extends JUnitSuite { import PublishTest._ @@ -34,14 +34,13 @@ class PublishTest extends JUnitSuite { } @Test def shouldCreateSomePublishRequestWithActorId = { - val publish = Publish.forConsumers(List(newActor[ConsumeAnnotatedActor])) + val publish = Publish.forConsumer(newActor[ConsumeAnnotatedActor]) assert(publish === Some(Publish("mock:test1", "test", false))) } @Test def shouldCreateSomePublishRequestWithActorUuid = { val ca = newActor[ConsumerActor] - val publish = Publish.forConsumers(List(ca)) - assert(publish === Some(Publish("mock:test2", ca.uuid, true))) + val publish = Publish.forConsumer(ca) assert(publish === Some(Publish("mock:test2", ca.uuid, true))) } @@ -50,4 +49,3 @@ class PublishTest extends JUnitSuite { assert(publish === None) } } -*/ \ No newline at end of file diff --git a/akka-core/src/main/scala/actor/Actor.scala b/akka-core/src/main/scala/actor/Actor.scala index 6e6684fe03..2f21432eb8 100644 --- a/akka-core/src/main/scala/actor/Actor.scala +++ b/akka-core/src/main/scala/actor/Actor.scala @@ -260,12 +260,13 @@ final class ActorID private[akka] () { this() newActorFactory = Left(Some(clazz)) } + private[akka] def this(factory: () => Actor) = { this() newActorFactory = Right(Some(factory)) } - lazy val actor: Actor = { + private[akka] lazy val actor: Actor = { val actor = newActorFactory match { case Left(Some(clazz)) => try { @@ -284,6 +285,12 @@ final class ActorID private[akka] () { if (actor eq null) throw new ActorInitializationException("Actor instance passed to ActorID can not be 'null'") actor } + + /** + * Returns the class for the Actor instance that is managed by the ActorID. + */ + def actorInstanceClass: Class[_ <: Actor] = actor.getClass.asInstanceOf[Class[_ <: Actor]] + /** * Starts up the actor and its message queue. */ @@ -296,12 +303,12 @@ final class ActorID private[akka] () { * Shuts down the actor its dispatcher and message queue. * Alias for 'stop'. */ - protected def exit = actor.stop + protected def exit: Unit = actor.stop /** * Shuts down the actor its dispatcher and message queue. */ - def stop = actor.stop + def stop: Unit = actor.stop /** * Is the actor running? @@ -456,7 +463,7 @@ final class ActorID private[akka] () { /** * Returns the id for the actor. */ - def getId = actor.getId + def id = actor.getId /** * Returns the uuid for the actor. @@ -1026,7 +1033,7 @@ trait Actor extends TransactionManagement with Logging { val server = new RemoteServer server.start(host, port) } - RemoteServer.actorsFor(RemoteServer.Address(host, port)).actors.put(sender.get.getId, sender.get) + RemoteServer.actorsFor(RemoteServer.Address(host, port)).actors.put(sender.get.id, sender.get) } RemoteProtocolBuilder.setMessage(message, requestBuilder) RemoteClient.clientFor(_remoteAddress.get).send[Any](requestBuilder.build, None) diff --git a/akka-core/src/main/scala/actor/ActorRegistry.scala b/akka-core/src/main/scala/actor/ActorRegistry.scala index 4579b24742..14ab413037 100644 --- a/akka-core/src/main/scala/actor/ActorRegistry.scala +++ b/akka-core/src/main/scala/actor/ActorRegistry.scala @@ -97,7 +97,7 @@ object ActorRegistry extends Logging { actorsByUUID.put(actorId.uuid, actorId) // ID - val id = actorId.getId + val id = actorId.id if (id eq null) throw new IllegalStateException("Actor.id is null " + actorId) if (actorsById.containsKey(id)) actorsById.put(id, actorId :: actorsById.get(id)) else actorsById.put(id, actorId :: Nil) @@ -117,7 +117,7 @@ object ActorRegistry extends Logging { */ def unregister(actor: ActorID) = { actorsByUUID remove actor.uuid - actorsById remove actor.getId + actorsById remove actor.id actorsByClassName remove actor.getClass.getName // notify listeners foreachListener(_ ! ActorUnregistered(actor)) @@ -139,6 +139,7 @@ object ActorRegistry extends Logging { * Adds the registration listener this this registry's listener list. */ def addRegistrationListener(listener: ActorID) = { + listener.start registrationListeners.add(listener) } @@ -146,11 +147,16 @@ object ActorRegistry extends Logging { * Removes the registration listener this this registry's listener list. */ def removeRegistrationListener(listener: ActorID) = { + listener.stop registrationListeners.remove(listener) } private def foreachListener(f: (ActorID) => Unit) { val iterator = registrationListeners.iterator - while (iterator.hasNext) f(iterator.next) + while (iterator.hasNext) { + val listener = iterator.next + if (listener.isRunning) f(listener) + else log.warning("Can't send ActorRegistryEvent to [%s] since it is not running.", listener) + } } } \ No newline at end of file diff --git a/akka-core/src/main/scala/actor/Supervisor.scala b/akka-core/src/main/scala/actor/Supervisor.scala index 4d0c1363a3..4de358f7ce 100644 --- a/akka-core/src/main/scala/actor/Supervisor.scala +++ b/akka-core/src/main/scala/actor/Supervisor.scala @@ -131,7 +131,7 @@ sealed class Supervisor private[akka] (handler: FaultHandlingStrategy, trapExcep startLink(actorId) remoteAddress.foreach(address => RemoteServer.actorsFor( RemoteServer.Address(address.hostname, address.port)) - .actors.put(actorId.getId, actorId)) + .actors.put(actorId.id, actorId)) case supervisorConfig @ SupervisorConfig(_, _) => // recursive supervisor configuration val supervisor = { diff --git a/akka-core/src/main/scala/remote/RemoteServer.scala b/akka-core/src/main/scala/remote/RemoteServer.scala index d1cfb86961..64e72f3fe8 100644 --- a/akka-core/src/main/scala/remote/RemoteServer.scala +++ b/akka-core/src/main/scala/remote/RemoteServer.scala @@ -200,8 +200,8 @@ class RemoteServer extends Logging { */ def register(actor: ActorID) = synchronized { if (_isRunning) { - log.info("Registering server side remote actor [%s] with id [%s]", actor.getClass.getName, actor.getId) - RemoteServer.actorsFor(RemoteServer.Address(hostname, port)).actors.put(actor.getId, actor) + log.info("Registering server side remote actor [%s] with id [%s]", actor.getClass.getName, actor.id) + RemoteServer.actorsFor(RemoteServer.Address(hostname, port)).actors.put(actor.id, actor) } }