diff --git a/akka-camel/src/main/scala/ConsumerPublisher.scala b/akka-camel/src/main/scala/ConsumerPublisher.scala index fe63577c46..517b2efaee 100644 --- a/akka-camel/src/main/scala/ConsumerPublisher.scala +++ b/akka-camel/src/main/scala/ConsumerPublisher.scala @@ -16,6 +16,33 @@ import se.scalablesolutions.akka.actor.annotation.consume import se.scalablesolutions.akka.camel.component.ActiveObjectComponent import se.scalablesolutions.akka.util.Logging +private[camel] object ConsumerPublisher extends Logging { + /** + * Creates a route to the registered consumer actor. + */ + def handleConsumerRegistered(event: ConsumerRegistered) { + CamelContextManager.context.addRoutes(new ConsumerActorRoute(event.uri, event.id, event.uuid)) + log.info("published actor %s at endpoint %s" format (event.actorRef, event.uri)) + } + + /** + * Stops route to the already un-registered consumer actor. + */ + def handleConsumerUnregistered(event: ConsumerUnregistered) { + CamelContextManager.context.stopRoute(event.id) + log.info("unpublished actor %s from endpoint %s" format (event.actorRef, event.uri)) + } + + def handleConsumerMethodRegistered(event: ConsumerMethodRegistered) { + val targetMethod = event.method.getName + val objectId = "%s_%s" format (event.init.actorRef.uuid, targetMethod) + + CamelContextManager.activeObjectRegistry.put(objectId, event.activeObject) + CamelContextManager.context.addRoutes(new ConsumerMethodRoute(event.uri, objectId, targetMethod)) + log.info("published method %s of %s at endpoint %s" format (targetMethod, event.activeObject, event.uri)) + } +} + /** * Actor that publishes consumer actors as Camel endpoints at the CamelContext managed * by se.scalablesolutions.akka.camel.CamelContextManager. It accepts messages of type @@ -24,7 +51,8 @@ import se.scalablesolutions.akka.util.Logging * * @author Martin Krasser */ -private[camel] class ConsumerPublisher extends Actor with Logging { +private[camel] class ConsumerPublisher extends Actor { + import ConsumerPublisher._ @volatile private var latch = new CountDownLatch(0) @@ -51,31 +79,6 @@ private[camel] class ConsumerPublisher extends Actor with Logging { } case _ => { /* ignore */} } - - /** - * Creates a route to the registered consumer actor. - */ - def handleConsumerRegistered(event: ConsumerRegistered) { - CamelContextManager.context.addRoutes(new ConsumerActorRoute(event.uri, event.id, event.uuid)) - log.info("published actor %s at endpoint %s" format (event.actorRef, event.uri)) - } - - /** - * Stops route to the already un-registered consumer actor. - */ - def handleConsumerUnregistered(event: ConsumerUnregistered) { - CamelContextManager.context.stopRoute(event.id) - log.info("unpublished actor %s from endpoint %s" format (event.actorRef, event.uri)) - } - - def handleConsumerMethodRegistered(event: ConsumerMethodRegistered) { - val targetMethod = event.method.getName - val objectId = "%s_%s" format (event.init.actorRef.uuid, targetMethod) - - CamelContextManager.activeObjectRegistry.put(objectId, event.activeObject) - CamelContextManager.context.addRoutes(new ConsumerMethodRoute(event.uri, objectId, targetMethod)) - log.info("published method %s of %s at endpoint %s" format (targetMethod, event.activeObject, event.uri)) - } } private[camel] case class SetExpectedMessageCount(num: Int) diff --git a/akka-camel/src/main/scala/component/ActiveObjectComponent.scala b/akka-camel/src/main/scala/component/ActiveObjectComponent.scala index a192b10fc9..4ed132a368 100644 --- a/akka-camel/src/main/scala/component/ActiveObjectComponent.scala +++ b/akka-camel/src/main/scala/component/ActiveObjectComponent.scala @@ -54,6 +54,13 @@ class ActiveObjectInfo(context: CamelContext, clazz: Class[_], strategy: Paramet extends BeanInfo(context, clazz, strategy) { protected override def introspect(clazz: Class[_]): Unit = { + + // TODO: fix target class detection in BeanInfo.introspect(Class) + // Camel assumes that classes containing a '$$' in the class name + // are classes generated with CGLIB. This conflicts with proxies + // created from interfaces with AspectWerkz. Once the fix is in + // place this method can be removed. + for (method <- clazz.getDeclaredMethods) { if (isValidMethod(clazz, method)) { introspect(clazz, method) diff --git a/akka-camel/src/test/java/se/scalablesolutions/akka/camel/PojoBase.java b/akka-camel/src/test/java/se/scalablesolutions/akka/camel/PojoBase.java new file mode 100644 index 0000000000..05bf1625bb --- /dev/null +++ b/akka-camel/src/test/java/se/scalablesolutions/akka/camel/PojoBase.java @@ -0,0 +1,34 @@ +package se.scalablesolutions.akka.camel; + +import org.apache.camel.Body; +import org.apache.camel.Header; + +import se.scalablesolutions.akka.actor.annotation.consume; + +/** + * @author Martin Krasser + */ +public class PojoBase { + + public String m1(String b, String h) { + return "m1base: " + b + " " + h; + } + + @consume("direct:m2base") + public String m2(@Body String b, @Header("test") String h) { + return "m2base: " + b + " " + h; + } + + @consume("direct:m3base") + public String m3(@Body String b, @Header("test") String h) { + return "m3base: " + b + " " + h; + } + + @consume("direct:m4base") + public String m4(@Body String b, @Header("test") String h) { + return "m4base: " + b + " " + h; + } + + public void m5(@Body String b, @Header("test") String h) { + } +} diff --git a/akka-camel/src/test/java/se/scalablesolutions/akka/camel/PojoImpl.java b/akka-camel/src/test/java/se/scalablesolutions/akka/camel/PojoImpl.java new file mode 100644 index 0000000000..b48202d4dc --- /dev/null +++ b/akka-camel/src/test/java/se/scalablesolutions/akka/camel/PojoImpl.java @@ -0,0 +1,23 @@ +package se.scalablesolutions.akka.camel; + +import org.apache.camel.Body; +import org.apache.camel.Header; + +import se.scalablesolutions.akka.actor.annotation.consume; + +/** + * @author Martin Krasser + */ +public class PojoImpl implements PojoIntf { + + public String m1(String b, String h) { + return "m1impl: " + b + " " + h; + } + + @consume("direct:m2impl") + public String m2(@Body String b, @Header("test") String h) { + return "m2impl: " + b + " " + h; + } + + +} diff --git a/akka-camel/src/test/java/se/scalablesolutions/akka/camel/PojoIntf.java b/akka-camel/src/test/java/se/scalablesolutions/akka/camel/PojoIntf.java new file mode 100644 index 0000000000..14f63afd2e --- /dev/null +++ b/akka-camel/src/test/java/se/scalablesolutions/akka/camel/PojoIntf.java @@ -0,0 +1,18 @@ +package se.scalablesolutions.akka.camel; + +import org.apache.camel.Body; +import org.apache.camel.Header; + +import se.scalablesolutions.akka.actor.annotation.consume; + +/** + * @author Martin Krasser + */ +public interface PojoIntf { + + public String m1(String b, String h); + + @consume("direct:m2intf") + public String m2(@Body String b, @Header("test") String h); + +} diff --git a/akka-camel/src/test/java/se/scalablesolutions/akka/camel/PojoSingle.java b/akka-camel/src/test/java/se/scalablesolutions/akka/camel/PojoSingle.java new file mode 100644 index 0000000000..7d577535b2 --- /dev/null +++ b/akka-camel/src/test/java/se/scalablesolutions/akka/camel/PojoSingle.java @@ -0,0 +1,14 @@ +package se.scalablesolutions.akka.camel; + +import se.scalablesolutions.akka.actor.annotation.consume; + +/** + * @author Martin Krasser + */ +public class PojoSingle { + + @consume("direct:foo") + public void foo(String b) { + } + +} diff --git a/akka-camel/src/test/java/se/scalablesolutions/akka/camel/PojoSub.java b/akka-camel/src/test/java/se/scalablesolutions/akka/camel/PojoSub.java new file mode 100644 index 0000000000..be5b453698 --- /dev/null +++ b/akka-camel/src/test/java/se/scalablesolutions/akka/camel/PojoSub.java @@ -0,0 +1,27 @@ +package se.scalablesolutions.akka.camel; + +import org.apache.camel.Body; +import org.apache.camel.Header; + +import se.scalablesolutions.akka.actor.annotation.consume; + +public class PojoSub extends PojoBase { + + @Override + @consume("direct:m1sub") + public String m1(@Body String b, @Header("test") String h) { + return "m1sub: " + b + " " + h; + } + + @Override + public String m2(String b, String h) { + return "m2sub: " + b + " " + h; + } + + @Override + @consume("direct:m3sub") + public String m3(@Body String b, @Header("test") String h) { + return "m3sub: " + b + " " + h; + } + +} diff --git a/akka-camel/src/test/scala/CamelServiceFeatureTest.scala b/akka-camel/src/test/scala/CamelServiceFeatureTest.scala index 6abaf007eb..82d3cd74a3 100644 --- a/akka-camel/src/test/scala/CamelServiceFeatureTest.scala +++ b/akka-camel/src/test/scala/CamelServiceFeatureTest.scala @@ -6,7 +6,7 @@ import org.apache.camel.builder.RouteBuilder import org.scalatest.{GivenWhenThen, BeforeAndAfterAll, FeatureSpec} import se.scalablesolutions.akka.actor.Actor._ -import se.scalablesolutions.akka.actor.{Actor, ActorRegistry} +import se.scalablesolutions.akka.actor.{ActiveObject, Actor, ActorRegistry} class CamelServiceFeatureTest extends FeatureSpec with BeforeAndAfterAll with GivenWhenThen { import CamelServiceFeatureTest._ @@ -97,6 +97,27 @@ class CamelServiceFeatureTest extends FeatureSpec with BeforeAndAfterAll with Gi assert(response === "received msg3") } } + + feature("Publish active object methods in the global CamelContext") { + + scenario("access active object methods via Camel direct-endpoints") { + + given("two consumer actors registered before and after CamelService startup") + val latch = service.consumerPublisher.!![CountDownLatch](SetExpectedMessageCount(3)).get + ActiveObject.newInstance(classOf[PojoBase]) + assert(latch.await(5000, TimeUnit.MILLISECONDS)) + + when("requests are sent to published methods") + val response1 = CamelContextManager.template.requestBodyAndHeader("direct:m2base", "x", "test", "y") + val response2 = CamelContextManager.template.requestBodyAndHeader("direct:m3base", "x", "test", "y") + val response3 = CamelContextManager.template.requestBodyAndHeader("direct:m4base", "x", "test", "y") + + then("each should have returned a different response") + assert(response1 === "m2base: x y") + assert(response2 === "m3base: x y") + assert(response3 === "m4base: x y") + } + } } object CamelServiceFeatureTest { diff --git a/akka-camel/src/test/scala/ConsumerMethodRegisteredTest.scala b/akka-camel/src/test/scala/ConsumerMethodRegisteredTest.scala new file mode 100644 index 0000000000..66d63d69e8 --- /dev/null +++ b/akka-camel/src/test/scala/ConsumerMethodRegisteredTest.scala @@ -0,0 +1,46 @@ +package se.scalablesolutions.akka.camel + +import java.net.InetSocketAddress + +import org.junit.Test +import org.scalatest.junit.JUnitSuite + +import se.scalablesolutions.akka.actor.{AspectInit, ActiveObject} +import se.scalablesolutions.akka.camel.ConsumerMethodRegistered._ + +class ConsumerMethodRegisteredTest extends JUnitSuite { + val remoteAddress = new InetSocketAddress("localhost", 8888); + val remoteAspectInit = AspectInit(classOf[String], null, Some(remoteAddress), 1000) + val localAspectInit = AspectInit(classOf[String], null, None, 1000) + + val activePojoBase = ActiveObject.newInstance(classOf[PojoBase]) + val activePojoSub = ActiveObject.newInstance(classOf[PojoSub]) + val activePojoIntf = ActiveObject.newInstance(classOf[PojoIntf], new PojoImpl) + + val ascendingMethodName = (r1: ConsumerMethodRegistered, r2: ConsumerMethodRegistered) => + r1.method.getName < r2.method.getName + + @Test def shouldSelectPojoBaseMethods234 = { + val registered = forConsumer(activePojoBase, localAspectInit).sortWith(ascendingMethodName) + assert(registered.size === 3) + assert(registered.map(_.method.getName) === List("m2", "m3", "m4")) + } + + @Test def shouldSelectPojoSubMethods134 = { + val registered = forConsumer(activePojoSub, localAspectInit).sortWith(ascendingMethodName) + assert(registered.size === 3) + assert(registered.map(_.method.getName) === List("m1", "m3", "m4")) + } + + @Test def shouldSelectPojoIntfMethod2 = { + val registered = forConsumer(activePojoIntf, localAspectInit) + assert(registered.size === 1) + assert(registered(0).method.getName === "m2") + } + + @Test def shouldIgnoreRemoteProxies = { + val registered = forConsumer(activePojoBase, remoteAspectInit) + assert(registered.size === 0) + } + +} \ No newline at end of file diff --git a/akka-camel/src/test/scala/PublishRequestorTest.scala b/akka-camel/src/test/scala/PublishRequestorTest.scala index 3aa43c04cf..f3c9a899b2 100644 --- a/akka-camel/src/test/scala/PublishRequestorTest.scala +++ b/akka-camel/src/test/scala/PublishRequestorTest.scala @@ -5,8 +5,8 @@ import java.util.concurrent.{CountDownLatch, TimeUnit} import org.junit.{Before, After, Test} import org.scalatest.junit.JUnitSuite +import se.scalablesolutions.akka.actor._ import se.scalablesolutions.akka.actor.Actor._ -import se.scalablesolutions.akka.actor.{Actor, ActorRef, ActorRegistry, ActorRegistered, ActorUnregistered} import se.scalablesolutions.akka.camel.support.{SetExpectedMessageCount => SetExpectedTestMessageCount, _} class PublishRequestorTest extends JUnitSuite { @@ -31,6 +31,19 @@ class PublishRequestorTest extends JUnitSuite { ActorRegistry.shutdownAll } + @Test def shouldReceiveConsumerMethodRegisteredEvent = { + val obj = ActiveObject.newInstance(classOf[PojoSingle]) + val init = AspectInit(classOf[PojoSingle], null, None, 1000) + val latch = publisher.!![CountDownLatch](SetExpectedTestMessageCount(1)).get + requestor ! AspectInitRegistered(obj, init) + assert(latch.await(5000, TimeUnit.MILLISECONDS)) + val event = (publisher !! GetRetainedMessage).get.asInstanceOf[ConsumerMethodRegistered] + assert(event.init === init) + assert(event.uri === "direct:foo") + assert(event.activeObject === obj) + assert(event.method.getName === "foo") + } + @Test def shouldReceiveConsumerRegisteredEvent = { val latch = publisher.!![CountDownLatch](SetExpectedTestMessageCount(1)).get requestor ! ActorRegistered(consumer) @@ -46,9 +59,6 @@ class PublishRequestorTest extends JUnitSuite { assert((publisher !! GetRetainedMessage) === Some(ConsumerUnregistered(consumer, "mock:test", consumer.uuid, true))) } - - // TODO: test active object method registration - } object PublishRequestorTest { diff --git a/akka-camel/src/test/scala/component/ActiveObjectComponentFeatureTest.scala b/akka-camel/src/test/scala/component/ActiveObjectComponentFeatureTest.scala new file mode 100644 index 0000000000..9239673df2 --- /dev/null +++ b/akka-camel/src/test/scala/component/ActiveObjectComponentFeatureTest.scala @@ -0,0 +1,74 @@ +package se.scalablesolutions.akka.camel.component + +import org.scalatest.{BeforeAndAfterEach, BeforeAndAfterAll, FeatureSpec} + +import se.scalablesolutions.akka.actor.Actor._ +import se.scalablesolutions.akka.camel._ +import se.scalablesolutions.akka.actor.{ActorRegistry, ActiveObject} +import org.apache.camel.{ExchangePattern, Exchange, Processor} + +/** + * @author Martin Krasser + */ +class ActiveObjectComponentFeatureTest extends FeatureSpec with BeforeAndAfterAll with BeforeAndAfterEach { + override protected def beforeAll = { + val activePojoBase = ActiveObject.newInstance(classOf[PojoBase]) + val activePojoIntf = ActiveObject.newInstance(classOf[PojoIntf], new PojoImpl) + + CamelContextManager.init + CamelContextManager.start + + CamelContextManager.activeObjectRegistry.put("base", activePojoBase) + CamelContextManager.activeObjectRegistry.put("intf", activePojoIntf) + } + + override protected def afterAll = { + CamelContextManager.stop + ActorRegistry.shutdownAll + } + + feature("Communicate with an active object from a Camel application using active object endpoint URIs") { + import ActiveObjectComponent.DefaultSchema + import CamelContextManager.template + import ExchangePattern._ + + scenario("in-out exchange with proxy created from interface and method returning String") { + val result = template.requestBodyAndHeader("%s:intf?method=m2" format DefaultSchema, "x", "test", "y") + assert(result === "m2impl: x y") + } + + scenario("in-out exchange with proxy created from class and method returning String") { + val result = template.requestBodyAndHeader("%s:base?method=m2" format DefaultSchema, "x", "test", "y") + assert(result === "m2base: x y") + } + + scenario("in-out exchange with proxy created from class and method returning void") { + val result = template.requestBodyAndHeader("%s:base?method=m5" format DefaultSchema, "x", "test", "y") + assert(result === "x") // returns initial body + } + + scenario("in-only exchange with proxy created from class and method returning String") { + val result = template.send("%s:base?method=m2" format DefaultSchema, InOnly, new Processor { + def process(exchange: Exchange) = { + exchange.getIn.setBody("x") + exchange.getIn.setHeader("test", "y") + } + }); + assert(result.getPattern === InOnly) + assert(result.getIn.getBody === "m2base: x y") + assert(result.getOut.getBody === null) + } + + scenario("in-only exchange with proxy created from class and method returning void") { + val result = template.send("%s:base?method=m5" format DefaultSchema, InOnly, new Processor { + def process(exchange: Exchange) = { + exchange.getIn.setBody("x") + exchange.getIn.setHeader("test", "y") + } + }); + assert(result.getPattern === InOnly) + assert(result.getIn.getBody === "x") + assert(result.getOut.getBody === null) + } + } +} diff --git a/akka-camel/src/test/scala/component/ActorComponentFeatureTest.scala b/akka-camel/src/test/scala/component/ActorComponentFeatureTest.scala index 3c3cf80815..f73a2fcd3e 100644 --- a/akka-camel/src/test/scala/component/ActorComponentFeatureTest.scala +++ b/akka-camel/src/test/scala/component/ActorComponentFeatureTest.scala @@ -7,8 +7,8 @@ import org.scalatest.{BeforeAndAfterEach, BeforeAndAfterAll, FeatureSpec} import se.scalablesolutions.akka.actor.Actor._ import se.scalablesolutions.akka.actor.{ActorRegistry, Actor} -import se.scalablesolutions.akka.camel.support._ import se.scalablesolutions.akka.camel.{Message, CamelContextManager} +import se.scalablesolutions.akka.camel.support._ class ActorComponentFeatureTest extends FeatureSpec with BeforeAndAfterAll with BeforeAndAfterEach { override protected def beforeAll = { diff --git a/akka-samples/akka-sample-camel/src/main/java/sample/camel/Consumer10.java b/akka-samples/akka-sample-camel/src/main/java/sample/camel/Consumer10.java index af1ccaf8c4..f08c486dac 100644 --- a/akka-samples/akka-sample-camel/src/main/java/sample/camel/Consumer10.java +++ b/akka-samples/akka-sample-camel/src/main/java/sample/camel/Consumer10.java @@ -18,7 +18,7 @@ public class Consumer10 { @consume("jetty:http://0.0.0.0:8877/camel/active") public String bar(@Body String body, @Header("name") String header) { - return String.format("%s %s", body, header); + return String.format("body=%s header=%s", body, header); } }