Customizing routes to typed consumer actors (Scala and Java API) and refactorings.

This commit is contained in:
Martin Krasser 2010-11-09 13:52:20 +01:00
parent 1913b32738
commit 5550e743a2
10 changed files with 197 additions and 114 deletions

View file

@ -24,4 +24,11 @@ public @interface consume {
*/
public abstract String value();
/**
* Route definition handler class for customizing route to annotated method.
* The handler class must have a default constructor.
*/
public abstract Class<? extends RouteDefinitionHandler> routeDefinitionHandler()
default RouteDefinitionIdentity.class;
}

View file

@ -18,12 +18,12 @@ import akka.japi.{Function => JFunction}
* @author Martin Krasser
*/
trait Consumer { self: Actor =>
import Consumer.Handler
import RouteDefinitionHandler._
/**
* The default route definition handler is the identity function
*/
private[camel] var routeDefinitionHandler: Handler = { rd: RouteDefinition => rd }
private[camel] var routeDefinitionHandler: RouteDefinitionHandler = identity
/**
* Returns the Camel endpoint URI to consume messages from.
@ -40,7 +40,14 @@ trait Consumer { self: Actor =>
/**
* Sets the route definition handler for creating a custom route to this consumer instance.
*/
def onRouteDefinition(h: Handler): Unit = routeDefinitionHandler = h
def onRouteDefinition(h: RouteDefinition => ProcessorDefinition[_]): Unit = onRouteDefinition(from(h))
/**
* Sets the route definition handler for creating a custom route to this consumer instance.
* <p>
* Java API.
*/
def onRouteDefinition(h: RouteDefinitionHandler): Unit = routeDefinitionHandler = h
}
/**
@ -67,12 +74,6 @@ trait UntypedConsumer extends Consumer { self: UntypedActor =>
* doesn't have any effect on one-way communications (they'll never block).
*/
def isBlocking() = super.blocking
/**
* Sets the route definition handler for creating a custom route to this consumer instance.
*/
def onRouteDefinition(h: JFunction[RouteDefinition, ProcessorDefinition[_]]): Unit =
onRouteDefinition { rd: RouteDefinition => h(rd) }
}
/**
@ -95,17 +96,46 @@ abstract class RemoteUntypedConsumerActor(address: InetSocketAddress) extends Re
def this(host: String, port: Int) = this(new InetSocketAddress(host, port))
}
/**
* A callback handler for route definitions to consumer actors.
*
* @author Martin Krasser
*/
trait RouteDefinitionHandler {
def onRouteDefinition(rd: RouteDefinition): ProcessorDefinition[_]
}
/**
* The identity route definition handler.
*
* @author Martin Krasser
*
*/
class RouteDefinitionIdentity extends RouteDefinitionHandler {
def onRouteDefinition(rd: RouteDefinition) = rd
}
/**
* @author Martin Krasser
*/
object RouteDefinitionHandler {
/**
* Returns the identity route definition handler
*/
val identity = new RouteDefinitionIdentity
/**
* Created a route definition handler from the given function.
*/
def from(f: RouteDefinition => ProcessorDefinition[_]) = new RouteDefinitionHandler {
def onRouteDefinition(rd: RouteDefinition) = f(rd)
}
}
/**
* @author Martin Krasser
*/
private[camel] object Consumer {
/**
* Type of a route definition handler. A route definition handler is a function
* that modifies a route definition which is passed as argument and returns the
* modified definition.
*/
type Handler = RouteDefinition => ProcessorDefinition[_]
/**
* Applies a function <code>f</code> to <code>actorRef</code> if <code>actorRef</code>
* references a consumer actor. A valid reference to a consumer actor is a local actor

View file

@ -23,7 +23,7 @@ private[camel] object ConsumerPublisher extends Logging {
/**
* Creates a route to the registered consumer actor.
*/
def handleConsumerRegistered(event: ConsumerRegistered) {
def handleConsumerActorRegistered(event: ConsumerActorRegistered) {
CamelContextManager.mandatoryContext.addRoutes(new ConsumerActorRouteBuilder(event))
log.info("published actor %s at endpoint %s" format (event.actorRef, event.endpointUri))
}
@ -31,7 +31,7 @@ private[camel] object ConsumerPublisher extends Logging {
/**
* Stops the route to the already un-registered consumer actor.
*/
def handleConsumerUnregistered(event: ConsumerUnregistered) {
def handleConsumerActorUnregistered(event: ConsumerActorUnregistered) {
CamelContextManager.mandatoryContext.stopRoute(event.uuid)
log.info("unpublished actor %s from endpoint %s" format (event.actorRef, event.endpointUri))
}
@ -40,24 +40,18 @@ private[camel] object ConsumerPublisher extends Logging {
* Creates a route to an typed actor method.
*/
def handleConsumerMethodRegistered(event: ConsumerMethodRegistered) {
val targetMethod = event.method.getName
val objectId = "%s_%s" format (event.init.actorRef.uuid, targetMethod)
CamelContextManager.typedActorRegistry.put(objectId, event.typedActor)
CamelContextManager.mandatoryContext.addRoutes(new ConsumerMethodRouteBuilder(event.uri, objectId, targetMethod))
log.info("published method %s of %s at endpoint %s" format (targetMethod, event.typedActor, event.uri))
CamelContextManager.typedActorRegistry.put(event.methodUuid, event.typedActor)
CamelContextManager.mandatoryContext.addRoutes(new ConsumerMethodRouteBuilder(event))
log.info("published method %s of %s at endpoint %s" format (event.methodName, event.typedActor, event.endpointUri))
}
/**
* Stops the route to the already un-registered consumer actor method.
*/
def handleConsumerMethodUnregistered(event: ConsumerMethodUnregistered) {
val targetMethod = event.method.getName
val objectId = "%s_%s" format (event.init.actorRef.uuid, targetMethod)
CamelContextManager.typedActorRegistry.remove(objectId)
CamelContextManager.mandatoryContext.stopRoute(objectId)
log.info("unpublished method %s of %s from endpoint %s" format (targetMethod, event.typedActor, event.uri))
CamelContextManager.typedActorRegistry.remove(event.methodUuid)
CamelContextManager.mandatoryContext.stopRoute(event.methodUuid)
log.info("unpublished method %s of %s from endpoint %s" format (event.methodName, event.typedActor, event.endpointUri))
}
}
@ -65,8 +59,8 @@ private[camel] object ConsumerPublisher extends Logging {
* Actor that publishes consumer actors and typed actor methods at Camel endpoints.
* The Camel context used for publishing is obtained via CamelContextManager.context.
* This actor accepts messages of type
* akka.camel.ConsumerRegistered,
* akka.camel.ConsumerUnregistered,
* akka.camel.ConsumerActorRegistered,
* akka.camel.ConsumerActorUnregistered,
* akka.camel.ConsumerMethodRegistered and
* akka.camel.ConsumerMethodUnregistered.
*
@ -79,12 +73,12 @@ private[camel] class ConsumerPublisher extends Actor {
@volatile private var unregistrationLatch = new CountDownLatch(0)
protected def receive = {
case r: ConsumerRegistered => {
handleConsumerRegistered(r)
case r: ConsumerActorRegistered => {
handleConsumerActorRegistered(r)
registrationLatch.countDown
}
case u: ConsumerUnregistered => {
handleConsumerUnregistered(u)
case u: ConsumerActorUnregistered => {
handleConsumerActorUnregistered(u)
unregistrationLatch.countDown
}
case mr: ConsumerMethodRegistered => {
@ -131,9 +125,10 @@ private[camel] abstract class ConsumerRouteBuilder(endpointUri: String, id: Stri
onRouteDefinition(startRouteDefinition(cnvopt)).to(targetUri)
}
protected def onRouteDefinition(rd: RouteDefinition): ProcessorDefinition[_]
protected def routeDefinitionHandler: RouteDefinitionHandler
protected def targetUri: String
private def onRouteDefinition(rd: RouteDefinition) = routeDefinitionHandler.onRouteDefinition(rd)
private def startRouteDefinition(bodyConversion: Option[Class[_]]): RouteDefinition = bodyConversion match {
case Some(clazz) => from(endpointUri).routeId(id).convertBodyTo(clazz)
case None => from(endpointUri).routeId(id)
@ -145,8 +140,8 @@ private[camel] abstract class ConsumerRouteBuilder(endpointUri: String, id: Stri
*
* @author Martin Krasser
*/
private[camel] class ConsumerActorRouteBuilder(event: ConsumerRegistered) extends ConsumerRouteBuilder(event.endpointUri, event.uuid) {
protected def onRouteDefinition(rd: RouteDefinition) = event.routeDefinitionHandler(rd)
private[camel] class ConsumerActorRouteBuilder(event: ConsumerActorRegistered) extends ConsumerRouteBuilder(event.endpointUri, event.uuid) {
protected def routeDefinitionHandler: RouteDefinitionHandler = event.routeDefinitionHandler
protected def targetUri = "actor:uuid:%s?blocking=%s" format (event.uuid, event.blocking)
}
@ -155,9 +150,9 @@ private[camel] class ConsumerActorRouteBuilder(event: ConsumerRegistered) extend
*
* @author Martin Krasser
*/
private[camel] class ConsumerMethodRouteBuilder(val endpointUri: String, id: String, method: String) extends ConsumerRouteBuilder(endpointUri, id) {
protected def onRouteDefinition(rd: RouteDefinition) = rd // TODO: use provided route definition handler
protected def targetUri = "%s:%s?method=%s" format (TypedActorComponent.InternalSchema, id, method)
private[camel] class ConsumerMethodRouteBuilder(event: ConsumerMethodRegistered) extends ConsumerRouteBuilder(event.endpointUri, event.methodUuid) {
protected def routeDefinitionHandler: RouteDefinitionHandler = event.routeDefinitionHandler
protected def targetUri = "%s:%s?method=%s" format (TypedActorComponent.InternalSchema, event.methodUuid, event.methodName)
}
/**
@ -179,9 +174,9 @@ private[camel] class PublishRequestor extends Actor {
protected def receive = {
case ActorRegistered(actor) =>
for (event <- ConsumerRegistered.forConsumer(actor)) deliverCurrentEvent(event)
for (event <- ConsumerActorRegistered.forConsumer(actor)) deliverCurrentEvent(event)
case ActorUnregistered(actor) =>
for (event <- ConsumerUnregistered.forConsumer(actor)) deliverCurrentEvent(event)
for (event <- ConsumerActorUnregistered.forConsumer(actor)) deliverCurrentEvent(event)
case AspectInitRegistered(proxy, init) =>
for (event <- ConsumerMethodRegistered.forConsumer(proxy, init)) deliverCurrentEvent(event)
case AspectInitUnregistered(proxy, init) =>
@ -214,76 +209,72 @@ private[camel] case class PublishRequestorInit(consumerPublisher: ActorRef)
/**
* A consumer (un)registration event.
*
* @author Martin Krasser
*/
private[camel] sealed trait ConsumerEvent
/**
* Event indicating that a consumer actor has been registered at the actor registry.
*
* @param actorRef actor reference
* @param actor actor implementation
*
* @author Martin Krasser
* A consumer actor (un)registration event.
*/
private[camel] case class ConsumerRegistered(actorRef: ActorRef, actor: Consumer) extends ConsumerEvent {
def uuid = actorRef.uuid.toString
def endpointUri = actor.endpointUri
def blocking = actor.blocking
def routeDefinitionHandler = actor.routeDefinitionHandler
private[camel] trait ConsumerActorEvent extends ConsumerEvent {
val actorRef: ActorRef
val actor: Consumer
val uuid = actorRef.uuid.toString
val endpointUri = actor.endpointUri
val blocking = actor.blocking
val routeDefinitionHandler = actor.routeDefinitionHandler
}
/**
* Event indicating that a consumer actor has been unregistered from the actor registry.
*
* @param actorRef actor reference
* @param actor actor implementation
*
* @author Martin Krasser
* A consumer method (un)registration event.
*/
private[camel] case class ConsumerUnregistered(actorRef: ActorRef, actor: Consumer) extends ConsumerEvent {
def uuid = actorRef.uuid.toString
def endpointUri = actor.endpointUri
private[camel] trait ConsumerMethodEvent extends ConsumerEvent {
val typedActor: AnyRef
val init: AspectInit
val method: Method
val uuid = init.actorRef.uuid.toString
val methodName = method.getName
val methodUuid = "%s_%s" format (uuid, methodName)
lazy val routeDefinitionHandler = consumeAnnotation.routeDefinitionHandler.newInstance
lazy val consumeAnnotation = method.getAnnotation(classOf[consume])
lazy val endpointUri = consumeAnnotation.value
}
/**
* Event indicating that a consumer actor has been registered at the actor registry.
*/
private[camel] case class ConsumerActorRegistered(actorRef: ActorRef, actor: Consumer) extends ConsumerActorEvent
/**
* Event indicating that a consumer actor has been unregistered from the actor registry.
*/
private[camel] case class ConsumerActorUnregistered(actorRef: ActorRef, actor: Consumer) extends ConsumerActorEvent
/**
* Event indicating that an typed actor proxy has been created for a typed actor. For each <code>@consume</code>
* annotated typed actor method a separate instance of this class is created.
*
* @param typedActor typed actor (proxy).
* @param init
* @param uri endpoint URI of the typed actor method
* @param method method to be published.
*
* @author Martin Krasser
*/
private[camel] case class ConsumerMethodRegistered(typedActor: AnyRef, init: AspectInit, uri: String, method: Method) extends ConsumerEvent
private[camel] case class ConsumerMethodRegistered(typedActor: AnyRef, init: AspectInit, method: Method) extends ConsumerMethodEvent
/**
* Event indicating that an typed actor has been stopped. For each <code>@consume</code>
* annotated typed object method a separate instance of this class is created.
*
* @param typedActor typed actor (proxy).
* @param init
* @param uri endpoint URI of the typed actor method
* @param method method to be un-published.
*
* @author Martin Krasser
*/
private[camel] case class ConsumerMethodUnregistered(typedActor: AnyRef, init: AspectInit, uri: String, method: Method) extends ConsumerEvent
private[camel] case class ConsumerMethodUnregistered(typedActor: AnyRef, init: AspectInit, method: Method) extends ConsumerMethodEvent
/**
* @author Martin Krasser
*/
private[camel] object ConsumerRegistered {
private[camel] object ConsumerActorRegistered {
/**
* Creates an ConsumerRegistered event message for a consumer actor or None if
* Creates an ConsumerActorRegistered event message for a consumer actor or None if
* <code>actorRef</code> is not a consumer actor.
*/
def forConsumer(actorRef: ActorRef): Option[ConsumerRegistered] = {
Consumer.forConsumer[ConsumerRegistered](actorRef) {
actor => ConsumerRegistered(actorRef, actor)
def forConsumer(actorRef: ActorRef): Option[ConsumerActorRegistered] = {
Consumer.forConsumer[ConsumerActorRegistered](actorRef) {
actor => ConsumerActorRegistered(actorRef, actor)
}
}
}
@ -291,14 +282,14 @@ private[camel] object ConsumerRegistered {
/**
* @author Martin Krasser
*/
private[camel] object ConsumerUnregistered {
private[camel] object ConsumerActorUnregistered {
/**
* Creates an ConsumerUnregistered event message for a consumer actor or None if
* Creates an ConsumerActorUnregistered event message for a consumer actor or None if
* <code>actorRef</code> is not a consumer actor.
*/
def forConsumer(actorRef: ActorRef): Option[ConsumerUnregistered] = {
Consumer.forConsumer[ConsumerUnregistered](actorRef) {
actor => ConsumerUnregistered(actorRef, actor)
def forConsumer(actorRef: ActorRef): Option[ConsumerActorUnregistered] = {
Consumer.forConsumer[ConsumerActorUnregistered](actorRef) {
actor => ConsumerActorUnregistered(actorRef, actor)
}
}
}
@ -338,7 +329,7 @@ private[camel] object ConsumerMethodRegistered {
*/
def forConsumer(typedActor: AnyRef, init: AspectInit): List[ConsumerMethodRegistered] = {
ConsumerMethod.forConsumer(typedActor, init) {
m => ConsumerMethodRegistered(typedActor, init, m.getAnnotation(classOf[consume]).value, m)
m => ConsumerMethodRegistered(typedActor, init, m)
}
}
}
@ -354,7 +345,7 @@ private[camel] object ConsumerMethodUnregistered {
*/
def forConsumer(typedActor: AnyRef, init: AspectInit): List[ConsumerMethodUnregistered] = {
ConsumerMethod.forConsumer(typedActor, init) {
m => ConsumerMethodUnregistered(typedActor, init, m.getAnnotation(classOf[consume]).value, m)
m => ConsumerMethodUnregistered(typedActor, init, m)
}
}
}

View file

@ -1,6 +1,7 @@
package akka.camel;
import akka.actor.ActorRegistry;
import akka.actor.TypedActor;
import akka.actor.UntypedActor;
import akka.japi.SideEffect;
@ -18,6 +19,8 @@ import static org.junit.Assert.*;
*/
public class ConsumerJavaTestBase {
private SampleErrorHandlingTypedConsumer consumer;
@BeforeClass
public static void setUpBeforeClass() {
startCamelService();
@ -30,7 +33,7 @@ public class ConsumerJavaTestBase {
}
@Test
public void shouldHandleExceptionAndGenerateCustomResponse() {
public void shouldHandleExceptionThrownByActorAndGenerateCustomResponse() {
getMandatoryService().awaitEndpointActivation(1, new SideEffect() {
public void apply() {
UntypedActor.actorOf(SampleErrorHandlingConsumer.class).start();
@ -39,4 +42,18 @@ public class ConsumerJavaTestBase {
String result = getMandatoryTemplate().requestBody("direct:error-handler-test-java", "hello", String.class);
assertEquals("error: hello", result);
}
@Test
public void shouldHandleExceptionThrownByTypedActorAndGenerateCustomResponse() {
getMandatoryService().awaitEndpointActivation(1, new SideEffect() {
public void apply() {
consumer = TypedActor.newInstance(
SampleErrorHandlingTypedConsumer.class,
SampleErrorHandlingTypedConsumerImpl.class);
}
});
String result = getMandatoryTemplate().requestBody("direct:error-handler-test-java-typed", "hello", String.class);
assertEquals("error: hello", result);
}
}

View file

@ -1,6 +1,5 @@
package akka.camel;
import akka.japi.Function;
import org.apache.camel.builder.Builder;
import org.apache.camel.model.ProcessorDefinition;
import org.apache.camel.model.RouteDefinition;
@ -19,8 +18,8 @@ public class SampleErrorHandlingConsumer extends UntypedConsumerActor {
}
public void preStart() {
onRouteDefinition(new Function<RouteDefinition, ProcessorDefinition<?>>() {
public ProcessorDefinition<?> apply(RouteDefinition rd) {
onRouteDefinition(new RouteDefinitionHandler() {
public ProcessorDefinition<?> onRouteDefinition(RouteDefinition rd) {
return rd.onException(Exception.class).handled(true).transform(Builder.exceptionMessage()).end();
}
});

View file

@ -0,0 +1,11 @@
package akka.camel;
/**
* @author Martin Krasser
*/
public interface SampleErrorHandlingTypedConsumer {
@consume(value="direct:error-handler-test-java-typed", routeDefinitionHandler=SampleRouteDefinitionHandler.class)
String willFail(String s);
}

View file

@ -0,0 +1,14 @@
package akka.camel;
import akka.actor.TypedActor;
/**
* @author Martin Krasser
*/
public class SampleErrorHandlingTypedConsumerImpl extends TypedActor implements SampleErrorHandlingTypedConsumer {
public String willFail(String s) {
throw new RuntimeException(String.format("error: %s", s));
}
}

View file

@ -0,0 +1,14 @@
package akka.camel;
import org.apache.camel.builder.Builder;
import org.apache.camel.model.ProcessorDefinition;
import org.apache.camel.model.RouteDefinition;
/**
* @author Martin Krasser
*/
public class SampleRouteDefinitionHandler implements RouteDefinitionHandler {
public ProcessorDefinition<?> onRouteDefinition(RouteDefinition rd) {
return rd.onException(Exception.class).handled(true).transform(Builder.exceptionMessage()).end();
}
}

View file

@ -9,36 +9,36 @@ class ConsumerRegisteredTest extends JUnitSuite {
@Test def shouldCreateSomeNonBlockingPublishRequestFromConsumer = {
val c = Actor.actorOf[ConsumerActor1]
val event = ConsumerRegistered.forConsumer(c)
assert(event === Some(ConsumerRegistered(c, consumerOf(c))))
val event = ConsumerActorRegistered.forConsumer(c)
assert(event === Some(ConsumerActorRegistered(c, consumerOf(c))))
}
@Test def shouldCreateSomeBlockingPublishRequestFromConsumer = {
val c = Actor.actorOf[ConsumerActor2]
val event = ConsumerRegistered.forConsumer(c)
assert(event === Some(ConsumerRegistered(c, consumerOf(c))))
val event = ConsumerActorRegistered.forConsumer(c)
assert(event === Some(ConsumerActorRegistered(c, consumerOf(c))))
}
@Test def shouldCreateNoneFromConsumer = {
val event = ConsumerRegistered.forConsumer(Actor.actorOf[PlainActor])
val event = ConsumerActorRegistered.forConsumer(Actor.actorOf[PlainActor])
assert(event === None)
}
@Test def shouldCreateSomeNonBlockingPublishRequestFromUntypedConsumer = {
val uc = UntypedActor.actorOf(classOf[SampleUntypedConsumer])
val event = ConsumerRegistered.forConsumer(uc)
assert(event === Some(ConsumerRegistered(uc, consumerOf(uc))))
val event = ConsumerActorRegistered.forConsumer(uc)
assert(event === Some(ConsumerActorRegistered(uc, consumerOf(uc))))
}
@Test def shouldCreateSomeBlockingPublishRequestFromUntypedConsumer = {
val uc = UntypedActor.actorOf(classOf[SampleUntypedConsumerBlocking])
val event = ConsumerRegistered.forConsumer(uc)
assert(event === Some(ConsumerRegistered(uc, consumerOf(uc))))
val event = ConsumerActorRegistered.forConsumer(uc)
assert(event === Some(ConsumerActorRegistered(uc, consumerOf(uc))))
}
@Test def shouldCreateNoneFromUntypedConsumer = {
val a = UntypedActor.actorOf(classOf[SampleUntypedActor])
val event = ConsumerRegistered.forConsumer(a)
val event = ConsumerActorRegistered.forConsumer(a)
assert(event === None)
}

View file

@ -40,9 +40,9 @@ class PublishRequestorTest extends JUnitSuite {
val obj = TypedActor.newInstance(classOf[SampleTypedSingleConsumer], classOf[SampleTypedSingleConsumerImpl])
assert(latch.await(5000, TimeUnit.MILLISECONDS))
val event = (publisher !! GetRetainedMessage).as[ConsumerMethodRegistered].get
assert(event.uri === "direct:foo")
assert(event.endpointUri === "direct:foo")
assert(event.typedActor === obj)
assert(event.method.getName === "foo")
assert(event.methodName === "foo")
}
@Test def shouldReceiveOneConsumerMethodUnregisteredEvent = {
@ -52,9 +52,9 @@ class PublishRequestorTest extends JUnitSuite {
TypedActor.stop(obj)
assert(latch.await(5000, TimeUnit.MILLISECONDS))
val event = (publisher !! GetRetainedMessage).as[ConsumerMethodUnregistered].get
assert(event.uri === "direct:foo")
assert(event.endpointUri === "direct:foo")
assert(event.typedActor === obj)
assert(event.method.getName === "foo")
assert(event.methodName === "foo")
}
@Test def shouldReceiveThreeConsumerMethodRegisteredEvents = {
@ -83,7 +83,7 @@ class PublishRequestorTest extends JUnitSuite {
requestor ! ActorRegistered(consumer)
assert(latch.await(5000, TimeUnit.MILLISECONDS))
assert((publisher !! GetRetainedMessage) ===
Some(ConsumerRegistered(consumer, consumer.actor.asInstanceOf[Consumer])))
Some(ConsumerActorRegistered(consumer, consumer.actor.asInstanceOf[Consumer])))
}
@Test def shouldReceiveOneConsumerUnregisteredEvent = {
@ -91,7 +91,7 @@ class PublishRequestorTest extends JUnitSuite {
requestor ! ActorUnregistered(consumer)
assert(latch.await(5000, TimeUnit.MILLISECONDS))
assert((publisher !! GetRetainedMessage) ===
Some(ConsumerUnregistered(consumer, consumer.actor.asInstanceOf[Consumer])))
Some(ConsumerActorUnregistered(consumer, consumer.actor.asInstanceOf[Consumer])))
}
}