diff --git a/akka-camel/src/main/scala/ConsumerPublisher.scala b/akka-camel/src/main/scala/ConsumerPublisher.scala
index 8e0062f065..8d29739f02 100644
--- a/akka-camel/src/main/scala/ConsumerPublisher.scala
+++ b/akka-camel/src/main/scala/ConsumerPublisher.scala
@@ -303,6 +303,26 @@ private[camel] object ConsumerUnregistered {
}
}
+/**
+ * @author Martin Krasser
+ */
+private[camel] object ConsumerMethod {
+ /**
+ * Applies a function f to each consumer method of activeObject and
+ * returns the function results as a list. A consumer method is one that is annotated with
+ * @consume. If activeObject is a proxy for a remote active object
+ * f is never called and Nil is returned.
+ */
+ def forConsumer[T](activeObject: AnyRef, init: AspectInit)(f: Method => T): List[T] = {
+ // TODO: support consumer annotation inheritance
+ // - visit overridden methods in superclasses
+ // - visit implemented method declarations in interfaces
+ if (init.remoteAddress.isDefined) Nil // let remote node publish active object methods on endpoints
+ else for (m <- activeObject.getClass.getMethods.toList; if (m.isAnnotationPresent(classOf[consume])))
+ yield f(m)
+ }
+}
+
/**
* @author Martin Krasser
*/
@@ -313,12 +333,9 @@ private[camel] object ConsumerMethodRegistered {
* have any @consume annotated methods.
*/
def forConsumer(activeObject: AnyRef, init: AspectInit): List[ConsumerMethodRegistered] = {
- // TODO: support consumer annotation inheritance
- // - visit overridden methods in superclasses
- // - visit implemented method declarations in interfaces
- if (init.remoteAddress.isDefined) Nil // let remote node publish active object methods on endpoints
- else for (m <- activeObject.getClass.getMethods.toList; if (m.isAnnotationPresent(classOf[consume])))
- yield ConsumerMethodRegistered(activeObject, init, m.getAnnotation(classOf[consume]).value, m)
+ ConsumerMethod.forConsumer[ConsumerMethodRegistered](activeObject, init) {
+ m => ConsumerMethodRegistered(activeObject, init, m.getAnnotation(classOf[consume]).value, m)
+ }
}
}
@@ -329,9 +346,9 @@ private[camel] object ConsumerMethodUnregistered {
* have any @consume annotated methods.
*/
def forConsumer(activeObject: AnyRef, init: AspectInit): List[ConsumerMethodUnregistered] = {
- if (init.remoteAddress.isDefined) Nil
- else for (m <- activeObject.getClass.getMethods.toList; if (m.isAnnotationPresent(classOf[consume])))
- yield ConsumerMethodUnregistered(activeObject, init, m.getAnnotation(classOf[consume]).value, m)
+ ConsumerMethod.forConsumer[ConsumerMethodUnregistered](activeObject, init) {
+ m => ConsumerMethodUnregistered(activeObject, init, m.getAnnotation(classOf[consume]).value, m)
+ }
}
}
diff --git a/akka-camel/src/test/scala/CamelServiceFeatureTest.scala b/akka-camel/src/test/scala/CamelServiceFeatureTest.scala
index 771ed83af3..1e88b62bf2 100644
--- a/akka-camel/src/test/scala/CamelServiceFeatureTest.scala
+++ b/akka-camel/src/test/scala/CamelServiceFeatureTest.scala
@@ -59,7 +59,7 @@ class CamelServiceFeatureTest extends FeatureSpec with BeforeAndAfterAll with Gi
feature("Unpublish registered consumer actor from the global CamelContext") {
- scenario("attempt access to unregistered consumer actor via Camel direct-endpoint") {
+ scenario("access to unregistered consumer actor via Camel direct-endpoint fails") {
val endpointUri = "direct:unpublish-test-1"
given("a consumer actor that has been stopped")
@@ -78,7 +78,7 @@ class CamelServiceFeatureTest extends FeatureSpec with BeforeAndAfterAll with Gi
when("a request is sent to this actor")
val response1 = CamelContextManager.template.requestBody(endpointUri, "msg1")
- then("the direct endpoint falls back to its default behaviour and returns the original message")
+ then("the direct-endpoint falls back to its default behaviour and returns the original message")
assert(response1 === "msg1")
}
}
@@ -103,8 +103,8 @@ class CamelServiceFeatureTest extends FeatureSpec with BeforeAndAfterAll with Gi
scenario("access active object methods via Camel direct-endpoints") {
given("an active object registered after CamelService startup")
- val latch = (service.consumerPublisher !! SetExpectedMessageCount(3)).as[CountDownLatch].get
- ActiveObject.newInstance(classOf[PojoBase])
+ var latch = (service.consumerPublisher !! SetExpectedMessageCount(3)).as[CountDownLatch].get
+ val obj = ActiveObject.newInstance(classOf[PojoBase])
assert(latch.await(5000, TimeUnit.MILLISECONDS))
when("requests are sent to published methods")
@@ -116,6 +116,36 @@ class CamelServiceFeatureTest extends FeatureSpec with BeforeAndAfterAll with Gi
assert(response1 === "m2base: x y")
assert(response2 === "m3base: x y")
assert(response3 === "m4base: x y")
+
+ // cleanup to avoid conflicts with next test (i.e. avoid multiple consumers on direct-endpoints)
+ latch = (service.consumerPublisher !! SetExpectedMessageCount(3)).as[CountDownLatch].get
+ ActiveObject.stop(obj)
+ assert(latch.await(5000, TimeUnit.MILLISECONDS))
+ }
+ }
+
+ feature("Unpublish active object method from the global CamelContext") {
+
+ scenario("access to unregistered active object methof via Camel direct-endpoint fails") {
+
+ given("an active object that has been stopped")
+ var latch = (service.consumerPublisher !! SetExpectedMessageCount(3)).as[CountDownLatch].get
+ val obj = ActiveObject.newInstance(classOf[PojoBase])
+ assert(latch.await(5000, TimeUnit.MILLISECONDS))
+
+ latch = (service.consumerPublisher !! SetExpectedMessageCount(3)).as[CountDownLatch].get
+ ActiveObject.stop(obj)
+ assert(latch.await(5000, TimeUnit.MILLISECONDS))
+
+ when("requests are sent to published methods")
+ val response1 = CamelContextManager.template.requestBodyAndHeader("direct:m2base", "x", "test", "y")
+ val response2 = CamelContextManager.template.requestBodyAndHeader("direct:m3base", "x", "test", "y")
+ val response3 = CamelContextManager.template.requestBodyAndHeader("direct:m4base", "x", "test", "y")
+
+ then("the direct-endpoints fall back to their default behaviour and return the original message")
+ assert(response1 === "x")
+ assert(response2 === "x")
+ assert(response3 === "x")
}
}
}
diff --git a/akka-camel/src/test/scala/PublishRequestorTest.scala b/akka-camel/src/test/scala/PublishRequestorTest.scala
index 7729e6eec6..44c6c30684 100644
--- a/akka-camel/src/test/scala/PublishRequestorTest.scala
+++ b/akka-camel/src/test/scala/PublishRequestorTest.scala
@@ -44,6 +44,19 @@ class PublishRequestorTest extends JUnitSuite {
assert(event.method.getName === "foo")
}
+ @Test def shouldReceiveConsumerMethodUnregisteredEvent = {
+ val obj = ActiveObject.newInstance(classOf[PojoSingle])
+ val init = AspectInit(classOf[PojoSingle], null, None, 1000)
+ val latch = (publisher !! SetExpectedTestMessageCount(1)).as[CountDownLatch].get
+ requestor ! AspectInitUnregistered(obj, init)
+ assert(latch.await(5000, TimeUnit.MILLISECONDS))
+ val event = (publisher !! GetRetainedMessage).get.asInstanceOf[ConsumerMethodUnregistered]
+ assert(event.init === init)
+ assert(event.uri === "direct:foo")
+ assert(event.activeObject === obj)
+ assert(event.method.getName === "foo")
+ }
+
@Test def shouldReceiveConsumerRegisteredEvent = {
val latch = (publisher !! SetExpectedTestMessageCount(1)).as[CountDownLatch].get
requestor ! ActorRegistered(consumer)