diff --git a/akka-camel/src/main/java/akka/camel/consume.java b/akka-camel/src/main/java/akka/camel/consume.java index 90faa14372..ebcc2efd29 100644 --- a/akka-camel/src/main/java/akka/camel/consume.java +++ b/akka-camel/src/main/java/akka/camel/consume.java @@ -24,4 +24,11 @@ public @interface consume { */ public abstract String value(); + /** + * Route definition handler class for customizing route to annotated method. + * The handler class must have a default constructor. + */ + public abstract Class extends RouteDefinitionHandler> routeDefinitionHandler() + default RouteDefinitionIdentity.class; + } diff --git a/akka-camel/src/main/scala/Consumer.scala b/akka-camel/src/main/scala/Consumer.scala index b95856ad95..8720f34a39 100644 --- a/akka-camel/src/main/scala/Consumer.scala +++ b/akka-camel/src/main/scala/Consumer.scala @@ -18,12 +18,12 @@ import akka.japi.{Function => JFunction} * @author Martin Krasser */ trait Consumer { self: Actor => - import Consumer.Handler + import RouteDefinitionHandler._ /** * The default route definition handler is the identity function */ - private[camel] var routeDefinitionHandler: Handler = { rd: RouteDefinition => rd } + private[camel] var routeDefinitionHandler: RouteDefinitionHandler = identity /** * Returns the Camel endpoint URI to consume messages from. @@ -40,7 +40,14 @@ trait Consumer { self: Actor => /** * Sets the route definition handler for creating a custom route to this consumer instance. */ - def onRouteDefinition(h: Handler): Unit = routeDefinitionHandler = h + def onRouteDefinition(h: RouteDefinition => ProcessorDefinition[_]): Unit = onRouteDefinition(from(h)) + + /** + * Sets the route definition handler for creating a custom route to this consumer instance. + *
+ * Java API.
+ */
+ def onRouteDefinition(h: RouteDefinitionHandler): Unit = routeDefinitionHandler = h
}
/**
@@ -67,12 +74,6 @@ trait UntypedConsumer extends Consumer { self: UntypedActor =>
* doesn't have any effect on one-way communications (they'll never block).
*/
def isBlocking() = super.blocking
-
- /**
- * Sets the route definition handler for creating a custom route to this consumer instance.
- */
- def onRouteDefinition(h: JFunction[RouteDefinition, ProcessorDefinition[_]]): Unit =
- onRouteDefinition { rd: RouteDefinition => h(rd) }
}
/**
@@ -95,17 +96,46 @@ abstract class RemoteUntypedConsumerActor(address: InetSocketAddress) extends Re
def this(host: String, port: Int) = this(new InetSocketAddress(host, port))
}
+/**
+ * A callback handler for route definitions to consumer actors.
+ *
+ * @author Martin Krasser
+ */
+trait RouteDefinitionHandler {
+ def onRouteDefinition(rd: RouteDefinition): ProcessorDefinition[_]
+}
+
+/**
+ * The identity route definition handler.
+ *
+ * @author Martin Krasser
+ *
+ */
+class RouteDefinitionIdentity extends RouteDefinitionHandler {
+ def onRouteDefinition(rd: RouteDefinition) = rd
+}
+
+/**
+ * @author Martin Krasser
+ */
+object RouteDefinitionHandler {
+ /**
+ * Returns the identity route definition handler
+ */
+ val identity = new RouteDefinitionIdentity
+
+ /**
+ * Created a route definition handler from the given function.
+ */
+ def from(f: RouteDefinition => ProcessorDefinition[_]) = new RouteDefinitionHandler {
+ def onRouteDefinition(rd: RouteDefinition) = f(rd)
+ }
+}
+
/**
* @author Martin Krasser
*/
private[camel] object Consumer {
- /**
- * Type of a route definition handler. A route definition handler is a function
- * that modifies a route definition which is passed as argument and returns the
- * modified definition.
- */
- type Handler = RouteDefinition => ProcessorDefinition[_]
-
/**
* Applies a function f to actorRef if actorRef
* references a consumer actor. A valid reference to a consumer actor is a local actor
diff --git a/akka-camel/src/main/scala/ConsumerPublisher.scala b/akka-camel/src/main/scala/ConsumerPublisher.scala
index bbfcee1513..39c4e0bb2f 100644
--- a/akka-camel/src/main/scala/ConsumerPublisher.scala
+++ b/akka-camel/src/main/scala/ConsumerPublisher.scala
@@ -23,7 +23,7 @@ private[camel] object ConsumerPublisher extends Logging {
/**
* Creates a route to the registered consumer actor.
*/
- def handleConsumerRegistered(event: ConsumerRegistered) {
+ def handleConsumerActorRegistered(event: ConsumerActorRegistered) {
CamelContextManager.mandatoryContext.addRoutes(new ConsumerActorRouteBuilder(event))
log.info("published actor %s at endpoint %s" format (event.actorRef, event.endpointUri))
}
@@ -31,7 +31,7 @@ private[camel] object ConsumerPublisher extends Logging {
/**
* Stops the route to the already un-registered consumer actor.
*/
- def handleConsumerUnregistered(event: ConsumerUnregistered) {
+ def handleConsumerActorUnregistered(event: ConsumerActorUnregistered) {
CamelContextManager.mandatoryContext.stopRoute(event.uuid)
log.info("unpublished actor %s from endpoint %s" format (event.actorRef, event.endpointUri))
}
@@ -40,24 +40,18 @@ private[camel] object ConsumerPublisher extends Logging {
* Creates a route to an typed actor method.
*/
def handleConsumerMethodRegistered(event: ConsumerMethodRegistered) {
- val targetMethod = event.method.getName
- val objectId = "%s_%s" format (event.init.actorRef.uuid, targetMethod)
-
- CamelContextManager.typedActorRegistry.put(objectId, event.typedActor)
- CamelContextManager.mandatoryContext.addRoutes(new ConsumerMethodRouteBuilder(event.uri, objectId, targetMethod))
- log.info("published method %s of %s at endpoint %s" format (targetMethod, event.typedActor, event.uri))
+ CamelContextManager.typedActorRegistry.put(event.methodUuid, event.typedActor)
+ CamelContextManager.mandatoryContext.addRoutes(new ConsumerMethodRouteBuilder(event))
+ log.info("published method %s of %s at endpoint %s" format (event.methodName, event.typedActor, event.endpointUri))
}
/**
* Stops the route to the already un-registered consumer actor method.
*/
def handleConsumerMethodUnregistered(event: ConsumerMethodUnregistered) {
- val targetMethod = event.method.getName
- val objectId = "%s_%s" format (event.init.actorRef.uuid, targetMethod)
-
- CamelContextManager.typedActorRegistry.remove(objectId)
- CamelContextManager.mandatoryContext.stopRoute(objectId)
- log.info("unpublished method %s of %s from endpoint %s" format (targetMethod, event.typedActor, event.uri))
+ CamelContextManager.typedActorRegistry.remove(event.methodUuid)
+ CamelContextManager.mandatoryContext.stopRoute(event.methodUuid)
+ log.info("unpublished method %s of %s from endpoint %s" format (event.methodName, event.typedActor, event.endpointUri))
}
}
@@ -65,8 +59,8 @@ private[camel] object ConsumerPublisher extends Logging {
* Actor that publishes consumer actors and typed actor methods at Camel endpoints.
* The Camel context used for publishing is obtained via CamelContextManager.context.
* This actor accepts messages of type
- * akka.camel.ConsumerRegistered,
- * akka.camel.ConsumerUnregistered,
+ * akka.camel.ConsumerActorRegistered,
+ * akka.camel.ConsumerActorUnregistered,
* akka.camel.ConsumerMethodRegistered and
* akka.camel.ConsumerMethodUnregistered.
*
@@ -79,12 +73,12 @@ private[camel] class ConsumerPublisher extends Actor {
@volatile private var unregistrationLatch = new CountDownLatch(0)
protected def receive = {
- case r: ConsumerRegistered => {
- handleConsumerRegistered(r)
+ case r: ConsumerActorRegistered => {
+ handleConsumerActorRegistered(r)
registrationLatch.countDown
}
- case u: ConsumerUnregistered => {
- handleConsumerUnregistered(u)
+ case u: ConsumerActorUnregistered => {
+ handleConsumerActorUnregistered(u)
unregistrationLatch.countDown
}
case mr: ConsumerMethodRegistered => {
@@ -131,9 +125,10 @@ private[camel] abstract class ConsumerRouteBuilder(endpointUri: String, id: Stri
onRouteDefinition(startRouteDefinition(cnvopt)).to(targetUri)
}
- protected def onRouteDefinition(rd: RouteDefinition): ProcessorDefinition[_]
+ protected def routeDefinitionHandler: RouteDefinitionHandler
protected def targetUri: String
+ private def onRouteDefinition(rd: RouteDefinition) = routeDefinitionHandler.onRouteDefinition(rd)
private def startRouteDefinition(bodyConversion: Option[Class[_]]): RouteDefinition = bodyConversion match {
case Some(clazz) => from(endpointUri).routeId(id).convertBodyTo(clazz)
case None => from(endpointUri).routeId(id)
@@ -145,8 +140,8 @@ private[camel] abstract class ConsumerRouteBuilder(endpointUri: String, id: Stri
*
* @author Martin Krasser
*/
-private[camel] class ConsumerActorRouteBuilder(event: ConsumerRegistered) extends ConsumerRouteBuilder(event.endpointUri, event.uuid) {
- protected def onRouteDefinition(rd: RouteDefinition) = event.routeDefinitionHandler(rd)
+private[camel] class ConsumerActorRouteBuilder(event: ConsumerActorRegistered) extends ConsumerRouteBuilder(event.endpointUri, event.uuid) {
+ protected def routeDefinitionHandler: RouteDefinitionHandler = event.routeDefinitionHandler
protected def targetUri = "actor:uuid:%s?blocking=%s" format (event.uuid, event.blocking)
}
@@ -155,9 +150,9 @@ private[camel] class ConsumerActorRouteBuilder(event: ConsumerRegistered) extend
*
* @author Martin Krasser
*/
-private[camel] class ConsumerMethodRouteBuilder(val endpointUri: String, id: String, method: String) extends ConsumerRouteBuilder(endpointUri, id) {
- protected def onRouteDefinition(rd: RouteDefinition) = rd // TODO: use provided route definition handler
- protected def targetUri = "%s:%s?method=%s" format (TypedActorComponent.InternalSchema, id, method)
+private[camel] class ConsumerMethodRouteBuilder(event: ConsumerMethodRegistered) extends ConsumerRouteBuilder(event.endpointUri, event.methodUuid) {
+ protected def routeDefinitionHandler: RouteDefinitionHandler = event.routeDefinitionHandler
+ protected def targetUri = "%s:%s?method=%s" format (TypedActorComponent.InternalSchema, event.methodUuid, event.methodName)
}
/**
@@ -179,9 +174,9 @@ private[camel] class PublishRequestor extends Actor {
protected def receive = {
case ActorRegistered(actor) =>
- for (event <- ConsumerRegistered.forConsumer(actor)) deliverCurrentEvent(event)
+ for (event <- ConsumerActorRegistered.forConsumer(actor)) deliverCurrentEvent(event)
case ActorUnregistered(actor) =>
- for (event <- ConsumerUnregistered.forConsumer(actor)) deliverCurrentEvent(event)
+ for (event <- ConsumerActorUnregistered.forConsumer(actor)) deliverCurrentEvent(event)
case AspectInitRegistered(proxy, init) =>
for (event <- ConsumerMethodRegistered.forConsumer(proxy, init)) deliverCurrentEvent(event)
case AspectInitUnregistered(proxy, init) =>
@@ -214,76 +209,72 @@ private[camel] case class PublishRequestorInit(consumerPublisher: ActorRef)
/**
* A consumer (un)registration event.
- *
- * @author Martin Krasser
*/
private[camel] sealed trait ConsumerEvent
/**
- * Event indicating that a consumer actor has been registered at the actor registry.
- *
- * @param actorRef actor reference
- * @param actor actor implementation
- *
- * @author Martin Krasser
+ * A consumer actor (un)registration event.
*/
-private[camel] case class ConsumerRegistered(actorRef: ActorRef, actor: Consumer) extends ConsumerEvent {
- def uuid = actorRef.uuid.toString
- def endpointUri = actor.endpointUri
- def blocking = actor.blocking
- def routeDefinitionHandler = actor.routeDefinitionHandler
+private[camel] trait ConsumerActorEvent extends ConsumerEvent {
+ val actorRef: ActorRef
+ val actor: Consumer
+
+ val uuid = actorRef.uuid.toString
+ val endpointUri = actor.endpointUri
+ val blocking = actor.blocking
+ val routeDefinitionHandler = actor.routeDefinitionHandler
}
/**
- * Event indicating that a consumer actor has been unregistered from the actor registry.
- *
- * @param actorRef actor reference
- * @param actor actor implementation
- *
- * @author Martin Krasser
+ * A consumer method (un)registration event.
*/
-private[camel] case class ConsumerUnregistered(actorRef: ActorRef, actor: Consumer) extends ConsumerEvent {
- def uuid = actorRef.uuid.toString
- def endpointUri = actor.endpointUri
+private[camel] trait ConsumerMethodEvent extends ConsumerEvent {
+ val typedActor: AnyRef
+ val init: AspectInit
+ val method: Method
+
+ val uuid = init.actorRef.uuid.toString
+ val methodName = method.getName
+ val methodUuid = "%s_%s" format (uuid, methodName)
+
+ lazy val routeDefinitionHandler = consumeAnnotation.routeDefinitionHandler.newInstance
+ lazy val consumeAnnotation = method.getAnnotation(classOf[consume])
+ lazy val endpointUri = consumeAnnotation.value
}
+/**
+ * Event indicating that a consumer actor has been registered at the actor registry.
+ */
+private[camel] case class ConsumerActorRegistered(actorRef: ActorRef, actor: Consumer) extends ConsumerActorEvent
+
+/**
+ * Event indicating that a consumer actor has been unregistered from the actor registry.
+ */
+private[camel] case class ConsumerActorUnregistered(actorRef: ActorRef, actor: Consumer) extends ConsumerActorEvent
+
/**
* Event indicating that an typed actor proxy has been created for a typed actor. For each @consume
* annotated typed actor method a separate instance of this class is created.
- *
- * @param typedActor typed actor (proxy).
- * @param init
- * @param uri endpoint URI of the typed actor method
- * @param method method to be published.
- *
- * @author Martin Krasser
*/
-private[camel] case class ConsumerMethodRegistered(typedActor: AnyRef, init: AspectInit, uri: String, method: Method) extends ConsumerEvent
+private[camel] case class ConsumerMethodRegistered(typedActor: AnyRef, init: AspectInit, method: Method) extends ConsumerMethodEvent
/**
* Event indicating that an typed actor has been stopped. For each @consume
* annotated typed object method a separate instance of this class is created.
- *
- * @param typedActor typed actor (proxy).
- * @param init
- * @param uri endpoint URI of the typed actor method
- * @param method method to be un-published.
- *
- * @author Martin Krasser
*/
-private[camel] case class ConsumerMethodUnregistered(typedActor: AnyRef, init: AspectInit, uri: String, method: Method) extends ConsumerEvent
+private[camel] case class ConsumerMethodUnregistered(typedActor: AnyRef, init: AspectInit, method: Method) extends ConsumerMethodEvent
/**
* @author Martin Krasser
*/
-private[camel] object ConsumerRegistered {
+private[camel] object ConsumerActorRegistered {
/**
- * Creates an ConsumerRegistered event message for a consumer actor or None if
+ * Creates an ConsumerActorRegistered event message for a consumer actor or None if
* actorRef is not a consumer actor.
*/
- def forConsumer(actorRef: ActorRef): Option[ConsumerRegistered] = {
- Consumer.forConsumer[ConsumerRegistered](actorRef) {
- actor => ConsumerRegistered(actorRef, actor)
+ def forConsumer(actorRef: ActorRef): Option[ConsumerActorRegistered] = {
+ Consumer.forConsumer[ConsumerActorRegistered](actorRef) {
+ actor => ConsumerActorRegistered(actorRef, actor)
}
}
}
@@ -291,14 +282,14 @@ private[camel] object ConsumerRegistered {
/**
* @author Martin Krasser
*/
-private[camel] object ConsumerUnregistered {
+private[camel] object ConsumerActorUnregistered {
/**
- * Creates an ConsumerUnregistered event message for a consumer actor or None if
+ * Creates an ConsumerActorUnregistered event message for a consumer actor or None if
* actorRef is not a consumer actor.
*/
- def forConsumer(actorRef: ActorRef): Option[ConsumerUnregistered] = {
- Consumer.forConsumer[ConsumerUnregistered](actorRef) {
- actor => ConsumerUnregistered(actorRef, actor)
+ def forConsumer(actorRef: ActorRef): Option[ConsumerActorUnregistered] = {
+ Consumer.forConsumer[ConsumerActorUnregistered](actorRef) {
+ actor => ConsumerActorUnregistered(actorRef, actor)
}
}
}
@@ -338,7 +329,7 @@ private[camel] object ConsumerMethodRegistered {
*/
def forConsumer(typedActor: AnyRef, init: AspectInit): List[ConsumerMethodRegistered] = {
ConsumerMethod.forConsumer(typedActor, init) {
- m => ConsumerMethodRegistered(typedActor, init, m.getAnnotation(classOf[consume]).value, m)
+ m => ConsumerMethodRegistered(typedActor, init, m)
}
}
}
@@ -354,7 +345,7 @@ private[camel] object ConsumerMethodUnregistered {
*/
def forConsumer(typedActor: AnyRef, init: AspectInit): List[ConsumerMethodUnregistered] = {
ConsumerMethod.forConsumer(typedActor, init) {
- m => ConsumerMethodUnregistered(typedActor, init, m.getAnnotation(classOf[consume]).value, m)
+ m => ConsumerMethodUnregistered(typedActor, init, m)
}
}
}
diff --git a/akka-camel/src/test/java/akka/camel/ConsumerJavaTestBase.java b/akka-camel/src/test/java/akka/camel/ConsumerJavaTestBase.java
index b3c53327cd..c34ce0cc2e 100644
--- a/akka-camel/src/test/java/akka/camel/ConsumerJavaTestBase.java
+++ b/akka-camel/src/test/java/akka/camel/ConsumerJavaTestBase.java
@@ -1,6 +1,7 @@
package akka.camel;
import akka.actor.ActorRegistry;
+import akka.actor.TypedActor;
import akka.actor.UntypedActor;
import akka.japi.SideEffect;
@@ -18,6 +19,8 @@ import static org.junit.Assert.*;
*/
public class ConsumerJavaTestBase {
+ private SampleErrorHandlingTypedConsumer consumer;
+
@BeforeClass
public static void setUpBeforeClass() {
startCamelService();
@@ -30,7 +33,7 @@ public class ConsumerJavaTestBase {
}
@Test
- public void shouldHandleExceptionAndGenerateCustomResponse() {
+ public void shouldHandleExceptionThrownByActorAndGenerateCustomResponse() {
getMandatoryService().awaitEndpointActivation(1, new SideEffect() {
public void apply() {
UntypedActor.actorOf(SampleErrorHandlingConsumer.class).start();
@@ -39,4 +42,18 @@ public class ConsumerJavaTestBase {
String result = getMandatoryTemplate().requestBody("direct:error-handler-test-java", "hello", String.class);
assertEquals("error: hello", result);
}
+
+ @Test
+ public void shouldHandleExceptionThrownByTypedActorAndGenerateCustomResponse() {
+ getMandatoryService().awaitEndpointActivation(1, new SideEffect() {
+ public void apply() {
+ consumer = TypedActor.newInstance(
+ SampleErrorHandlingTypedConsumer.class,
+ SampleErrorHandlingTypedConsumerImpl.class);
+ }
+ });
+ String result = getMandatoryTemplate().requestBody("direct:error-handler-test-java-typed", "hello", String.class);
+ assertEquals("error: hello", result);
+ }
+
}
diff --git a/akka-camel/src/test/java/akka/camel/SampleErrorHandlingConsumer.java b/akka-camel/src/test/java/akka/camel/SampleErrorHandlingConsumer.java
index 4fec485cc5..4e35d4e6ab 100644
--- a/akka-camel/src/test/java/akka/camel/SampleErrorHandlingConsumer.java
+++ b/akka-camel/src/test/java/akka/camel/SampleErrorHandlingConsumer.java
@@ -1,6 +1,5 @@
package akka.camel;
-import akka.japi.Function;
import org.apache.camel.builder.Builder;
import org.apache.camel.model.ProcessorDefinition;
import org.apache.camel.model.RouteDefinition;
@@ -19,8 +18,8 @@ public class SampleErrorHandlingConsumer extends UntypedConsumerActor {
}
public void preStart() {
- onRouteDefinition(new Function