Initial tests for active object support

This commit is contained in:
Martin Krasser 2010-06-04 17:37:32 +02:00
parent 5402165d85
commit f2e5fb1c0a
13 changed files with 310 additions and 33 deletions

View file

@ -16,6 +16,33 @@ import se.scalablesolutions.akka.actor.annotation.consume
import se.scalablesolutions.akka.camel.component.ActiveObjectComponent
import se.scalablesolutions.akka.util.Logging
private[camel] object ConsumerPublisher extends Logging {
/**
* Creates a route to the registered consumer actor.
*/
def handleConsumerRegistered(event: ConsumerRegistered) {
CamelContextManager.context.addRoutes(new ConsumerActorRoute(event.uri, event.id, event.uuid))
log.info("published actor %s at endpoint %s" format (event.actorRef, event.uri))
}
/**
* Stops route to the already un-registered consumer actor.
*/
def handleConsumerUnregistered(event: ConsumerUnregistered) {
CamelContextManager.context.stopRoute(event.id)
log.info("unpublished actor %s from endpoint %s" format (event.actorRef, event.uri))
}
def handleConsumerMethodRegistered(event: ConsumerMethodRegistered) {
val targetMethod = event.method.getName
val objectId = "%s_%s" format (event.init.actorRef.uuid, targetMethod)
CamelContextManager.activeObjectRegistry.put(objectId, event.activeObject)
CamelContextManager.context.addRoutes(new ConsumerMethodRoute(event.uri, objectId, targetMethod))
log.info("published method %s of %s at endpoint %s" format (targetMethod, event.activeObject, event.uri))
}
}
/**
* Actor that publishes consumer actors as Camel endpoints at the CamelContext managed
* by se.scalablesolutions.akka.camel.CamelContextManager. It accepts messages of type
@ -24,7 +51,8 @@ import se.scalablesolutions.akka.util.Logging
*
* @author Martin Krasser
*/
private[camel] class ConsumerPublisher extends Actor with Logging {
private[camel] class ConsumerPublisher extends Actor {
import ConsumerPublisher._
@volatile private var latch = new CountDownLatch(0)
@ -51,31 +79,6 @@ private[camel] class ConsumerPublisher extends Actor with Logging {
}
case _ => { /* ignore */}
}
/**
* Creates a route to the registered consumer actor.
*/
def handleConsumerRegistered(event: ConsumerRegistered) {
CamelContextManager.context.addRoutes(new ConsumerActorRoute(event.uri, event.id, event.uuid))
log.info("published actor %s at endpoint %s" format (event.actorRef, event.uri))
}
/**
* Stops route to the already un-registered consumer actor.
*/
def handleConsumerUnregistered(event: ConsumerUnregistered) {
CamelContextManager.context.stopRoute(event.id)
log.info("unpublished actor %s from endpoint %s" format (event.actorRef, event.uri))
}
def handleConsumerMethodRegistered(event: ConsumerMethodRegistered) {
val targetMethod = event.method.getName
val objectId = "%s_%s" format (event.init.actorRef.uuid, targetMethod)
CamelContextManager.activeObjectRegistry.put(objectId, event.activeObject)
CamelContextManager.context.addRoutes(new ConsumerMethodRoute(event.uri, objectId, targetMethod))
log.info("published method %s of %s at endpoint %s" format (targetMethod, event.activeObject, event.uri))
}
}
private[camel] case class SetExpectedMessageCount(num: Int)

View file

@ -54,6 +54,13 @@ class ActiveObjectInfo(context: CamelContext, clazz: Class[_], strategy: Paramet
extends BeanInfo(context, clazz, strategy) {
protected override def introspect(clazz: Class[_]): Unit = {
// TODO: fix target class detection in BeanInfo.introspect(Class)
// Camel assumes that classes containing a '$$' in the class name
// are classes generated with CGLIB. This conflicts with proxies
// created from interfaces with AspectWerkz. Once the fix is in
// place this method can be removed.
for (method <- clazz.getDeclaredMethods) {
if (isValidMethod(clazz, method)) {
introspect(clazz, method)

View file

@ -0,0 +1,34 @@
package se.scalablesolutions.akka.camel;
import org.apache.camel.Body;
import org.apache.camel.Header;
import se.scalablesolutions.akka.actor.annotation.consume;
/**
* @author Martin Krasser
*/
public class PojoBase {
public String m1(String b, String h) {
return "m1base: " + b + " " + h;
}
@consume("direct:m2base")
public String m2(@Body String b, @Header("test") String h) {
return "m2base: " + b + " " + h;
}
@consume("direct:m3base")
public String m3(@Body String b, @Header("test") String h) {
return "m3base: " + b + " " + h;
}
@consume("direct:m4base")
public String m4(@Body String b, @Header("test") String h) {
return "m4base: " + b + " " + h;
}
public void m5(@Body String b, @Header("test") String h) {
}
}

View file

@ -0,0 +1,23 @@
package se.scalablesolutions.akka.camel;
import org.apache.camel.Body;
import org.apache.camel.Header;
import se.scalablesolutions.akka.actor.annotation.consume;
/**
* @author Martin Krasser
*/
public class PojoImpl implements PojoIntf {
public String m1(String b, String h) {
return "m1impl: " + b + " " + h;
}
@consume("direct:m2impl")
public String m2(@Body String b, @Header("test") String h) {
return "m2impl: " + b + " " + h;
}
}

View file

@ -0,0 +1,18 @@
package se.scalablesolutions.akka.camel;
import org.apache.camel.Body;
import org.apache.camel.Header;
import se.scalablesolutions.akka.actor.annotation.consume;
/**
* @author Martin Krasser
*/
public interface PojoIntf {
public String m1(String b, String h);
@consume("direct:m2intf")
public String m2(@Body String b, @Header("test") String h);
}

View file

@ -0,0 +1,14 @@
package se.scalablesolutions.akka.camel;
import se.scalablesolutions.akka.actor.annotation.consume;
/**
* @author Martin Krasser
*/
public class PojoSingle {
@consume("direct:foo")
public void foo(String b) {
}
}

View file

@ -0,0 +1,27 @@
package se.scalablesolutions.akka.camel;
import org.apache.camel.Body;
import org.apache.camel.Header;
import se.scalablesolutions.akka.actor.annotation.consume;
public class PojoSub extends PojoBase {
@Override
@consume("direct:m1sub")
public String m1(@Body String b, @Header("test") String h) {
return "m1sub: " + b + " " + h;
}
@Override
public String m2(String b, String h) {
return "m2sub: " + b + " " + h;
}
@Override
@consume("direct:m3sub")
public String m3(@Body String b, @Header("test") String h) {
return "m3sub: " + b + " " + h;
}
}

View file

@ -6,7 +6,7 @@ import org.apache.camel.builder.RouteBuilder
import org.scalatest.{GivenWhenThen, BeforeAndAfterAll, FeatureSpec}
import se.scalablesolutions.akka.actor.Actor._
import se.scalablesolutions.akka.actor.{Actor, ActorRegistry}
import se.scalablesolutions.akka.actor.{ActiveObject, Actor, ActorRegistry}
class CamelServiceFeatureTest extends FeatureSpec with BeforeAndAfterAll with GivenWhenThen {
import CamelServiceFeatureTest._
@ -97,6 +97,27 @@ class CamelServiceFeatureTest extends FeatureSpec with BeforeAndAfterAll with Gi
assert(response === "received msg3")
}
}
feature("Publish active object methods in the global CamelContext") {
scenario("access active object methods via Camel direct-endpoints") {
given("two consumer actors registered before and after CamelService startup")
val latch = service.consumerPublisher.!![CountDownLatch](SetExpectedMessageCount(3)).get
ActiveObject.newInstance(classOf[PojoBase])
assert(latch.await(5000, TimeUnit.MILLISECONDS))
when("requests are sent to published methods")
val response1 = CamelContextManager.template.requestBodyAndHeader("direct:m2base", "x", "test", "y")
val response2 = CamelContextManager.template.requestBodyAndHeader("direct:m3base", "x", "test", "y")
val response3 = CamelContextManager.template.requestBodyAndHeader("direct:m4base", "x", "test", "y")
then("each should have returned a different response")
assert(response1 === "m2base: x y")
assert(response2 === "m3base: x y")
assert(response3 === "m4base: x y")
}
}
}
object CamelServiceFeatureTest {

View file

@ -0,0 +1,46 @@
package se.scalablesolutions.akka.camel
import java.net.InetSocketAddress
import org.junit.Test
import org.scalatest.junit.JUnitSuite
import se.scalablesolutions.akka.actor.{AspectInit, ActiveObject}
import se.scalablesolutions.akka.camel.ConsumerMethodRegistered._
class ConsumerMethodRegisteredTest extends JUnitSuite {
val remoteAddress = new InetSocketAddress("localhost", 8888);
val remoteAspectInit = AspectInit(classOf[String], null, Some(remoteAddress), 1000)
val localAspectInit = AspectInit(classOf[String], null, None, 1000)
val activePojoBase = ActiveObject.newInstance(classOf[PojoBase])
val activePojoSub = ActiveObject.newInstance(classOf[PojoSub])
val activePojoIntf = ActiveObject.newInstance(classOf[PojoIntf], new PojoImpl)
val ascendingMethodName = (r1: ConsumerMethodRegistered, r2: ConsumerMethodRegistered) =>
r1.method.getName < r2.method.getName
@Test def shouldSelectPojoBaseMethods234 = {
val registered = forConsumer(activePojoBase, localAspectInit).sortWith(ascendingMethodName)
assert(registered.size === 3)
assert(registered.map(_.method.getName) === List("m2", "m3", "m4"))
}
@Test def shouldSelectPojoSubMethods134 = {
val registered = forConsumer(activePojoSub, localAspectInit).sortWith(ascendingMethodName)
assert(registered.size === 3)
assert(registered.map(_.method.getName) === List("m1", "m3", "m4"))
}
@Test def shouldSelectPojoIntfMethod2 = {
val registered = forConsumer(activePojoIntf, localAspectInit)
assert(registered.size === 1)
assert(registered(0).method.getName === "m2")
}
@Test def shouldIgnoreRemoteProxies = {
val registered = forConsumer(activePojoBase, remoteAspectInit)
assert(registered.size === 0)
}
}

View file

@ -5,8 +5,8 @@ import java.util.concurrent.{CountDownLatch, TimeUnit}
import org.junit.{Before, After, Test}
import org.scalatest.junit.JUnitSuite
import se.scalablesolutions.akka.actor._
import se.scalablesolutions.akka.actor.Actor._
import se.scalablesolutions.akka.actor.{Actor, ActorRef, ActorRegistry, ActorRegistered, ActorUnregistered}
import se.scalablesolutions.akka.camel.support.{SetExpectedMessageCount => SetExpectedTestMessageCount, _}
class PublishRequestorTest extends JUnitSuite {
@ -31,6 +31,19 @@ class PublishRequestorTest extends JUnitSuite {
ActorRegistry.shutdownAll
}
@Test def shouldReceiveConsumerMethodRegisteredEvent = {
val obj = ActiveObject.newInstance(classOf[PojoSingle])
val init = AspectInit(classOf[PojoSingle], null, None, 1000)
val latch = publisher.!![CountDownLatch](SetExpectedTestMessageCount(1)).get
requestor ! AspectInitRegistered(obj, init)
assert(latch.await(5000, TimeUnit.MILLISECONDS))
val event = (publisher !! GetRetainedMessage).get.asInstanceOf[ConsumerMethodRegistered]
assert(event.init === init)
assert(event.uri === "direct:foo")
assert(event.activeObject === obj)
assert(event.method.getName === "foo")
}
@Test def shouldReceiveConsumerRegisteredEvent = {
val latch = publisher.!![CountDownLatch](SetExpectedTestMessageCount(1)).get
requestor ! ActorRegistered(consumer)
@ -46,9 +59,6 @@ class PublishRequestorTest extends JUnitSuite {
assert((publisher !! GetRetainedMessage) ===
Some(ConsumerUnregistered(consumer, "mock:test", consumer.uuid, true)))
}
// TODO: test active object method registration
}
object PublishRequestorTest {

View file

@ -0,0 +1,74 @@
package se.scalablesolutions.akka.camel.component
import org.scalatest.{BeforeAndAfterEach, BeforeAndAfterAll, FeatureSpec}
import se.scalablesolutions.akka.actor.Actor._
import se.scalablesolutions.akka.camel._
import se.scalablesolutions.akka.actor.{ActorRegistry, ActiveObject}
import org.apache.camel.{ExchangePattern, Exchange, Processor}
/**
* @author Martin Krasser
*/
class ActiveObjectComponentFeatureTest extends FeatureSpec with BeforeAndAfterAll with BeforeAndAfterEach {
override protected def beforeAll = {
val activePojoBase = ActiveObject.newInstance(classOf[PojoBase])
val activePojoIntf = ActiveObject.newInstance(classOf[PojoIntf], new PojoImpl)
CamelContextManager.init
CamelContextManager.start
CamelContextManager.activeObjectRegistry.put("base", activePojoBase)
CamelContextManager.activeObjectRegistry.put("intf", activePojoIntf)
}
override protected def afterAll = {
CamelContextManager.stop
ActorRegistry.shutdownAll
}
feature("Communicate with an active object from a Camel application using active object endpoint URIs") {
import ActiveObjectComponent.DefaultSchema
import CamelContextManager.template
import ExchangePattern._
scenario("in-out exchange with proxy created from interface and method returning String") {
val result = template.requestBodyAndHeader("%s:intf?method=m2" format DefaultSchema, "x", "test", "y")
assert(result === "m2impl: x y")
}
scenario("in-out exchange with proxy created from class and method returning String") {
val result = template.requestBodyAndHeader("%s:base?method=m2" format DefaultSchema, "x", "test", "y")
assert(result === "m2base: x y")
}
scenario("in-out exchange with proxy created from class and method returning void") {
val result = template.requestBodyAndHeader("%s:base?method=m5" format DefaultSchema, "x", "test", "y")
assert(result === "x") // returns initial body
}
scenario("in-only exchange with proxy created from class and method returning String") {
val result = template.send("%s:base?method=m2" format DefaultSchema, InOnly, new Processor {
def process(exchange: Exchange) = {
exchange.getIn.setBody("x")
exchange.getIn.setHeader("test", "y")
}
});
assert(result.getPattern === InOnly)
assert(result.getIn.getBody === "m2base: x y")
assert(result.getOut.getBody === null)
}
scenario("in-only exchange with proxy created from class and method returning void") {
val result = template.send("%s:base?method=m5" format DefaultSchema, InOnly, new Processor {
def process(exchange: Exchange) = {
exchange.getIn.setBody("x")
exchange.getIn.setHeader("test", "y")
}
});
assert(result.getPattern === InOnly)
assert(result.getIn.getBody === "x")
assert(result.getOut.getBody === null)
}
}
}

View file

@ -7,8 +7,8 @@ import org.scalatest.{BeforeAndAfterEach, BeforeAndAfterAll, FeatureSpec}
import se.scalablesolutions.akka.actor.Actor._
import se.scalablesolutions.akka.actor.{ActorRegistry, Actor}
import se.scalablesolutions.akka.camel.support._
import se.scalablesolutions.akka.camel.{Message, CamelContextManager}
import se.scalablesolutions.akka.camel.support._
class ActorComponentFeatureTest extends FeatureSpec with BeforeAndAfterAll with BeforeAndAfterEach {
override protected def beforeAll = {

View file

@ -18,7 +18,7 @@ public class Consumer10 {
@consume("jetty:http://0.0.0.0:8877/camel/active")
public String bar(@Body String body, @Header("name") String header) {
return String.format("%s %s", body, header);
return String.format("body=%s header=%s", body, header);
}
}