From 460dcfe51626ef4fb7261b1e9854b034bf250145 Mon Sep 17 00:00:00 2001 From: Martin Krasser Date: Mon, 5 Jul 2010 12:48:26 +0200 Subject: [PATCH] Tests for stopping active object endpoints; minor refactoring in ConsumerPublisher --- .../src/main/scala/ConsumerPublisher.scala | 35 ++++++++++++----- .../test/scala/CamelServiceFeatureTest.scala | 38 +++++++++++++++++-- .../src/test/scala/PublishRequestorTest.scala | 13 +++++++ 3 files changed, 73 insertions(+), 13 deletions(-) diff --git a/akka-camel/src/main/scala/ConsumerPublisher.scala b/akka-camel/src/main/scala/ConsumerPublisher.scala index 8e0062f065..8d29739f02 100644 --- a/akka-camel/src/main/scala/ConsumerPublisher.scala +++ b/akka-camel/src/main/scala/ConsumerPublisher.scala @@ -303,6 +303,26 @@ private[camel] object ConsumerUnregistered { } } +/** + * @author Martin Krasser + */ +private[camel] object ConsumerMethod { + /** + * Applies a function f to each consumer method of activeObject and + * returns the function results as a list. A consumer method is one that is annotated with + * @consume. If activeObject is a proxy for a remote active object + * f is never called and Nil is returned. + */ + def forConsumer[T](activeObject: AnyRef, init: AspectInit)(f: Method => T): List[T] = { + // TODO: support consumer annotation inheritance + // - visit overridden methods in superclasses + // - visit implemented method declarations in interfaces + if (init.remoteAddress.isDefined) Nil // let remote node publish active object methods on endpoints + else for (m <- activeObject.getClass.getMethods.toList; if (m.isAnnotationPresent(classOf[consume]))) + yield f(m) + } +} + /** * @author Martin Krasser */ @@ -313,12 +333,9 @@ private[camel] object ConsumerMethodRegistered { * have any @consume annotated methods. */ def forConsumer(activeObject: AnyRef, init: AspectInit): List[ConsumerMethodRegistered] = { - // TODO: support consumer annotation inheritance - // - visit overridden methods in superclasses - // - visit implemented method declarations in interfaces - if (init.remoteAddress.isDefined) Nil // let remote node publish active object methods on endpoints - else for (m <- activeObject.getClass.getMethods.toList; if (m.isAnnotationPresent(classOf[consume]))) - yield ConsumerMethodRegistered(activeObject, init, m.getAnnotation(classOf[consume]).value, m) + ConsumerMethod.forConsumer[ConsumerMethodRegistered](activeObject, init) { + m => ConsumerMethodRegistered(activeObject, init, m.getAnnotation(classOf[consume]).value, m) + } } } @@ -329,9 +346,9 @@ private[camel] object ConsumerMethodUnregistered { * have any @consume annotated methods. */ def forConsumer(activeObject: AnyRef, init: AspectInit): List[ConsumerMethodUnregistered] = { - if (init.remoteAddress.isDefined) Nil - else for (m <- activeObject.getClass.getMethods.toList; if (m.isAnnotationPresent(classOf[consume]))) - yield ConsumerMethodUnregistered(activeObject, init, m.getAnnotation(classOf[consume]).value, m) + ConsumerMethod.forConsumer[ConsumerMethodUnregistered](activeObject, init) { + m => ConsumerMethodUnregistered(activeObject, init, m.getAnnotation(classOf[consume]).value, m) + } } } diff --git a/akka-camel/src/test/scala/CamelServiceFeatureTest.scala b/akka-camel/src/test/scala/CamelServiceFeatureTest.scala index 771ed83af3..1e88b62bf2 100644 --- a/akka-camel/src/test/scala/CamelServiceFeatureTest.scala +++ b/akka-camel/src/test/scala/CamelServiceFeatureTest.scala @@ -59,7 +59,7 @@ class CamelServiceFeatureTest extends FeatureSpec with BeforeAndAfterAll with Gi feature("Unpublish registered consumer actor from the global CamelContext") { - scenario("attempt access to unregistered consumer actor via Camel direct-endpoint") { + scenario("access to unregistered consumer actor via Camel direct-endpoint fails") { val endpointUri = "direct:unpublish-test-1" given("a consumer actor that has been stopped") @@ -78,7 +78,7 @@ class CamelServiceFeatureTest extends FeatureSpec with BeforeAndAfterAll with Gi when("a request is sent to this actor") val response1 = CamelContextManager.template.requestBody(endpointUri, "msg1") - then("the direct endpoint falls back to its default behaviour and returns the original message") + then("the direct-endpoint falls back to its default behaviour and returns the original message") assert(response1 === "msg1") } } @@ -103,8 +103,8 @@ class CamelServiceFeatureTest extends FeatureSpec with BeforeAndAfterAll with Gi scenario("access active object methods via Camel direct-endpoints") { given("an active object registered after CamelService startup") - val latch = (service.consumerPublisher !! SetExpectedMessageCount(3)).as[CountDownLatch].get - ActiveObject.newInstance(classOf[PojoBase]) + var latch = (service.consumerPublisher !! SetExpectedMessageCount(3)).as[CountDownLatch].get + val obj = ActiveObject.newInstance(classOf[PojoBase]) assert(latch.await(5000, TimeUnit.MILLISECONDS)) when("requests are sent to published methods") @@ -116,6 +116,36 @@ class CamelServiceFeatureTest extends FeatureSpec with BeforeAndAfterAll with Gi assert(response1 === "m2base: x y") assert(response2 === "m3base: x y") assert(response3 === "m4base: x y") + + // cleanup to avoid conflicts with next test (i.e. avoid multiple consumers on direct-endpoints) + latch = (service.consumerPublisher !! SetExpectedMessageCount(3)).as[CountDownLatch].get + ActiveObject.stop(obj) + assert(latch.await(5000, TimeUnit.MILLISECONDS)) + } + } + + feature("Unpublish active object method from the global CamelContext") { + + scenario("access to unregistered active object methof via Camel direct-endpoint fails") { + + given("an active object that has been stopped") + var latch = (service.consumerPublisher !! SetExpectedMessageCount(3)).as[CountDownLatch].get + val obj = ActiveObject.newInstance(classOf[PojoBase]) + assert(latch.await(5000, TimeUnit.MILLISECONDS)) + + latch = (service.consumerPublisher !! SetExpectedMessageCount(3)).as[CountDownLatch].get + ActiveObject.stop(obj) + assert(latch.await(5000, TimeUnit.MILLISECONDS)) + + when("requests are sent to published methods") + val response1 = CamelContextManager.template.requestBodyAndHeader("direct:m2base", "x", "test", "y") + val response2 = CamelContextManager.template.requestBodyAndHeader("direct:m3base", "x", "test", "y") + val response3 = CamelContextManager.template.requestBodyAndHeader("direct:m4base", "x", "test", "y") + + then("the direct-endpoints fall back to their default behaviour and return the original message") + assert(response1 === "x") + assert(response2 === "x") + assert(response3 === "x") } } } diff --git a/akka-camel/src/test/scala/PublishRequestorTest.scala b/akka-camel/src/test/scala/PublishRequestorTest.scala index 7729e6eec6..44c6c30684 100644 --- a/akka-camel/src/test/scala/PublishRequestorTest.scala +++ b/akka-camel/src/test/scala/PublishRequestorTest.scala @@ -44,6 +44,19 @@ class PublishRequestorTest extends JUnitSuite { assert(event.method.getName === "foo") } + @Test def shouldReceiveConsumerMethodUnregisteredEvent = { + val obj = ActiveObject.newInstance(classOf[PojoSingle]) + val init = AspectInit(classOf[PojoSingle], null, None, 1000) + val latch = (publisher !! SetExpectedTestMessageCount(1)).as[CountDownLatch].get + requestor ! AspectInitUnregistered(obj, init) + assert(latch.await(5000, TimeUnit.MILLISECONDS)) + val event = (publisher !! GetRetainedMessage).get.asInstanceOf[ConsumerMethodUnregistered] + assert(event.init === init) + assert(event.uri === "direct:foo") + assert(event.activeObject === obj) + assert(event.method.getName === "foo") + } + @Test def shouldReceiveConsumerRegisteredEvent = { val latch = (publisher !! SetExpectedTestMessageCount(1)).as[CountDownLatch].get requestor ! ActorRegistered(consumer)