Initial support for customizing routes to consumer actors.

This commit is contained in:
Martin Krasser 2010-11-05 15:37:44 +01:00
parent 46b84d716c
commit b8c2b679ca
5 changed files with 145 additions and 50 deletions

View file

@ -4,16 +4,26 @@
package akka.camel
import akka.actor._
import java.net.InetSocketAddress
import org.apache.camel.{Exchange, Processor}
import org.apache.camel.model.{RouteDefinition, ProcessorDefinition}
import akka.actor._
/**
* Mixed in by Actor implementations that consume message from Camel endpoints.
*
* @author Martin Krasser
*/
trait Consumer { self: Actor =>
import Consumer.Handler
/**
* The default route definition handler is the identity function
*/
private[camel] var routeDefinitionHandler: Handler = { rd: RouteDefinition => rd }
/**
* Returns the Camel endpoint URI to consume messages from.
*/
@ -25,6 +35,11 @@ trait Consumer { self: Actor =>
* doesn't have any effect on one-way communications (they'll never block).
*/
def blocking = false
/**
* Sets the route definition handler for creating a custom route to this consumer instance.
*/
def onRouteDefinition(h: Handler): Unit = routeDefinitionHandler = h
}
/**
@ -77,6 +92,13 @@ abstract class RemoteUntypedConsumerActor(address: InetSocketAddress) extends Re
* @author Martin Krasser
*/
private[camel] object Consumer {
/**
* Type of a route definition handler. A route definition handler is a function
* that modifies a route definition which is passed as argument and returns the
* modified definition.
*/
type Handler = RouteDefinition => ProcessorDefinition[_]
/**
* Applies a function <code>f</code> to <code>actorRef</code> if <code>actorRef</code>
* references a consumer actor. A valid reference to a consumer actor is a local actor

View file

@ -10,6 +10,7 @@ import java.lang.reflect.Method
import java.util.concurrent.CountDownLatch
import org.apache.camel.builder.RouteBuilder
import org.apache.camel.model.{ProcessorDefinition, RouteDefinition}
import akka.actor._
import akka.camel.component.TypedActorComponent
@ -23,16 +24,16 @@ private[camel] object ConsumerPublisher extends Logging {
* Creates a route to the registered consumer actor.
*/
def handleConsumerRegistered(event: ConsumerRegistered) {
CamelContextManager.mandatoryContext.addRoutes(new ConsumerActorRoute(event.uri, event.uuid, event.blocking))
log.info("published actor %s at endpoint %s" format (event.actorRef, event.uri))
CamelContextManager.mandatoryContext.addRoutes(new ConsumerActorRouteBuilder(event))
log.info("published actor %s at endpoint %s" format (event.actorRef, event.endpointUri))
}
/**
* Stops the route to the already un-registered consumer actor.
*/
def handleConsumerUnregistered(event: ConsumerUnregistered) {
CamelContextManager.mandatoryContext.stopRoute(event.uuid.toString)
log.info("unpublished actor %s from endpoint %s" format (event.actorRef, event.uri))
CamelContextManager.mandatoryContext.stopRoute(event.uuid)
log.info("unpublished actor %s from endpoint %s" format (event.actorRef, event.endpointUri))
}
/**
@ -43,7 +44,7 @@ private[camel] object ConsumerPublisher extends Logging {
val objectId = "%s_%s" format (event.init.actorRef.uuid, targetMethod)
CamelContextManager.typedActorRegistry.put(objectId, event.typedActor)
CamelContextManager.mandatoryContext.addRoutes(new ConsumerMethodRoute(event.uri, objectId, targetMethod))
CamelContextManager.mandatoryContext.addRoutes(new ConsumerMethodRouteBuilder(event.uri, objectId, targetMethod))
log.info("published method %s of %s at endpoint %s" format (targetMethod, event.typedActor, event.uri))
}
@ -117,7 +118,7 @@ private[camel] case class SetExpectedUnregistrationCount(num: Int)
*
* @author Martin Krasser
*/
private[camel] abstract class ConsumerRoute(endpointUri: String, id: String) extends RouteBuilder {
private[camel] abstract class ConsumerRouteBuilder(endpointUri: String, id: String) extends RouteBuilder {
// TODO: make conversions configurable
private val bodyConversions = Map(
"file" -> classOf[InputStream]
@ -125,39 +126,38 @@ private[camel] abstract class ConsumerRoute(endpointUri: String, id: String) ext
def configure = {
val schema = endpointUri take endpointUri.indexOf(":") // e.g. "http" from "http://whatever/..."
bodyConversions.get(schema) match {
case Some(clazz) => from(endpointUri).routeId(id).convertBodyTo(clazz).to(targetUri)
case None => from(endpointUri).routeId(id).to(targetUri)
}
val cnvopt = bodyConversions.get(schema)
onRouteDefinition(startRouteDefinition(cnvopt)).to(targetUri)
}
protected def onRouteDefinition(rd: RouteDefinition): ProcessorDefinition[_]
protected def targetUri: String
private def startRouteDefinition(bodyConversion: Option[Class[_]]): RouteDefinition = bodyConversion match {
case Some(clazz) => from(endpointUri).routeId(id).convertBodyTo(clazz)
case None => from(endpointUri).routeId(id)
}
}
/**
* Defines the route to a (untyped) consumer actor.
*
* @param endpointUri endpoint URI of the (untyped) consumer actor
* @param uuid actor uuid
* @param blocking true for blocking in-out exchanges, false otherwise
*
* @author Martin Krasser
*/
private[camel] class ConsumerActorRoute(endpointUri: String, uuid: Uuid, blocking: Boolean) extends ConsumerRoute(endpointUri, uuid.toString) {
protected override def targetUri = "actor:uuid:%s?blocking=%s" format (uuid, blocking)
private[camel] class ConsumerActorRouteBuilder(event: ConsumerRegistered) extends ConsumerRouteBuilder(event.endpointUri, event.uuid) {
protected def onRouteDefinition(rd: RouteDefinition) = event.routeDefinitionHandler(rd)
protected def targetUri = "actor:uuid:%s?blocking=%s" format (event.uuid, event.blocking)
}
/**
* Defines the route to a typed actor method.
*
* @param endpointUri endpoint URI of the consumer actor method
* @param id typed actor identifier
* @param method name of the method to invoke.
*
* @author Martin Krasser
*/
private[camel] class ConsumerMethodRoute(val endpointUri: String, id: String, method: String) extends ConsumerRoute(endpointUri, id) {
protected override def targetUri = "%s:%s?method=%s" format (TypedActorComponent.InternalSchema, id, method)
private[camel] class ConsumerMethodRouteBuilder(val endpointUri: String, id: String, method: String) extends ConsumerRouteBuilder(endpointUri, id) {
protected def onRouteDefinition(rd: RouteDefinition) = rd // TODO: use provided route definition handler
protected def targetUri = "%s:%s?method=%s" format (TypedActorComponent.InternalSchema, id, method)
}
/**
@ -223,24 +223,29 @@ private[camel] sealed trait ConsumerEvent
* Event indicating that a consumer actor has been registered at the actor registry.
*
* @param actorRef actor reference
* @param uri endpoint URI of the consumer actor
* @param uuid actor uuid
* @param blocking true for blocking in-out exchanges, false otherwise
* @param actor actor implementation
*
* @author Martin Krasser
*/
private[camel] case class ConsumerRegistered(actorRef: ActorRef, uri: String, uuid: Uuid, blocking: Boolean) extends ConsumerEvent
private[camel] case class ConsumerRegistered(actorRef: ActorRef, actor: Consumer) extends ConsumerEvent {
def uuid = actorRef.uuid.toString
def endpointUri = actor.endpointUri
def blocking = actor.blocking
def routeDefinitionHandler = actor.routeDefinitionHandler
}
/**
* Event indicating that a consumer actor has been unregistered from the actor registry.
*
* @param actorRef actor reference
* @param uri endpoint URI of the consumer actor
* @param uuid actor uuid
* @param actor actor implementation
*
* @author Martin Krasser
*/
private[camel] case class ConsumerUnregistered(actorRef: ActorRef, uri: String, uuid: Uuid) extends ConsumerEvent
private[camel] case class ConsumerUnregistered(actorRef: ActorRef, actor: Consumer) extends ConsumerEvent {
def uuid = actorRef.uuid.toString
def endpointUri = actor.endpointUri
}
/**
* Event indicating that an typed actor proxy has been created for a typed actor. For each <code>@consume</code>
@ -278,7 +283,7 @@ private[camel] object ConsumerRegistered {
*/
def forConsumer(actorRef: ActorRef): Option[ConsumerRegistered] = {
Consumer.forConsumer[ConsumerRegistered](actorRef) {
target => ConsumerRegistered(actorRef, target.endpointUri, actorRef.uuid, target.blocking)
actor => ConsumerRegistered(actorRef, actor)
}
}
}
@ -293,7 +298,7 @@ private[camel] object ConsumerUnregistered {
*/
def forConsumer(actorRef: ActorRef): Option[ConsumerUnregistered] = {
Consumer.forConsumer[ConsumerUnregistered](actorRef) {
target => ConsumerUnregistered(actorRef, target.endpointUri, actorRef.uuid)
actor => ConsumerUnregistered(actorRef, actor)
}
}
}

View file

@ -2,8 +2,7 @@ package akka.camel
import org.junit.Test
import org.scalatest.junit.JUnitSuite
import akka.actor.{Actor, UntypedActor}
import akka.actor.{ActorRef, Actor, UntypedActor}
class ConsumerRegisteredTest extends JUnitSuite {
import ConsumerRegisteredTest._
@ -11,13 +10,13 @@ class ConsumerRegisteredTest extends JUnitSuite {
@Test def shouldCreateSomeNonBlockingPublishRequestFromConsumer = {
val c = Actor.actorOf[ConsumerActor1]
val event = ConsumerRegistered.forConsumer(c)
assert(event === Some(ConsumerRegistered(c, "mock:test1", c.uuid, false)))
assert(event === Some(ConsumerRegistered(c, consumerOf(c))))
}
@Test def shouldCreateSomeBlockingPublishRequestFromConsumer = {
val c = Actor.actorOf[ConsumerActor2]
val event = ConsumerRegistered.forConsumer(c)
assert(event === Some(ConsumerRegistered(c, "mock:test2", c.uuid, true)))
assert(event === Some(ConsumerRegistered(c, consumerOf(c))))
}
@Test def shouldCreateNoneFromConsumer = {
@ -28,13 +27,13 @@ class ConsumerRegisteredTest extends JUnitSuite {
@Test def shouldCreateSomeNonBlockingPublishRequestFromUntypedConsumer = {
val uc = UntypedActor.actorOf(classOf[SampleUntypedConsumer])
val event = ConsumerRegistered.forConsumer(uc)
assert(event === Some(ConsumerRegistered(uc, "direct:test-untyped-consumer", uc.uuid, false)))
assert(event === Some(ConsumerRegistered(uc, consumerOf(uc))))
}
@Test def shouldCreateSomeBlockingPublishRequestFromUntypedConsumer = {
val uc = UntypedActor.actorOf(classOf[SampleUntypedConsumerBlocking])
val event = ConsumerRegistered.forConsumer(uc)
assert(event === Some(ConsumerRegistered(uc, "direct:test-untyped-consumer-blocking", uc.uuid, true)))
assert(event === Some(ConsumerRegistered(uc, consumerOf(uc))))
}
@Test def shouldCreateNoneFromUntypedConsumer = {
@ -42,6 +41,8 @@ class ConsumerRegisteredTest extends JUnitSuite {
val event = ConsumerRegistered.forConsumer(a)
assert(event === None)
}
private def consumerOf(ref: ActorRef) = ref.actor.asInstanceOf[Consumer]
}
object ConsumerRegisteredTest {

View file

@ -3,7 +3,8 @@ package akka.camel
import java.util.concurrent.{TimeoutException, CountDownLatch, TimeUnit}
import org.apache.camel.CamelExecutionException
import org.apache.camel.builder.RouteBuilder
import org.apache.camel.builder.Builder
import org.apache.camel.model.RouteDefinition
import org.scalatest.{BeforeAndAfterAll, WordSpec}
import org.scalatest.matchers.MustMatchers
@ -171,9 +172,34 @@ class ConsumerTest extends WordSpec with BeforeAndAfterAll with MustMatchers {
}
}
}
"A responding, blocking consumer" when {
"activated with a custom error handler" must {
"handle thrown exceptions by generating a custom response" in {
service.awaitEndpointActivation(1) {
actorOf[ErrorHandlingConsumer].start
} must be (true)
mandatoryTemplate.requestBody("direct:error-handler-test", "hello") must equal ("error: hello")
}
}
"activated with a custom redelivery handler" must {
"handle thrown exceptions by redelivering the initial message" in {
service.awaitEndpointActivation(1) {
actorOf[RedeliveringConsumer].start
} must be (true)
mandatoryTemplate.requestBody("direct:redelivery-test", "hello") must equal ("accepted: hello")
}
}
}
}
object ConsumerTest {
trait BlockingConsumer extends Consumer { self: Actor =>
override def blocking = true
}
class TestConsumer(uri: String) extends Actor with Consumer {
def endpointUri = uri
protected def receive = {
@ -181,6 +207,53 @@ object ConsumerTest {
}
}
class TestBlocker(uri: String) extends Actor with BlockingConsumer {
self.timeout = 1000
def endpointUri = uri
protected def receive = {
case msg: Message => { /* do not reply */ }
}
}
class ErrorHandlingConsumer extends Actor with BlockingConsumer {
def endpointUri = "direct:error-handler-test"
onRouteDefinition {rd: RouteDefinition =>
rd.onException(classOf[Exception]).handled(true).transform(Builder.exceptionMessage).end
}
protected def receive = {
case msg: Message => throw new Exception("error: %s" format msg.body)
}
}
class RedeliveringConsumer extends Actor with BlockingConsumer {
def endpointUri = "direct:redelivery-test"
onRouteDefinition {rd: RouteDefinition =>
rd.onException(classOf[Exception]).maximumRedeliveries(1).end
}
//
// first message to this actor is not valid and will be rejected
//
var valid = false
protected def receive = {
case msg: Message => try {
respondTo(msg)
} finally {
valid = true
}
}
private def respondTo(msg: Message) =
if (valid) self.reply("accepted: %s" format msg.body)
else throw new Exception("rejected: %s" format msg.body)
}
trait TestTypedConsumer {
@consume("direct:publish-test-3")
def foo(s: String): String
@ -193,12 +266,6 @@ object ConsumerTest {
def bar(s: String) = "bar: %s" format s
}
class TestBlocker(uri: String) extends Actor with Consumer {
self.timeout = 1000
def endpointUri = uri
override def blocking = true
protected def receive = {
case msg: Message => { /* do not reply */ }
}
}
}

View file

@ -83,7 +83,7 @@ class PublishRequestorTest extends JUnitSuite {
requestor ! ActorRegistered(consumer)
assert(latch.await(5000, TimeUnit.MILLISECONDS))
assert((publisher !! GetRetainedMessage) ===
Some(ConsumerRegistered(consumer, "mock:test", consumer.uuid, false)))
Some(ConsumerRegistered(consumer, consumer.actor.asInstanceOf[Consumer])))
}
@Test def shouldReceiveOneConsumerUnregisteredEvent = {
@ -91,7 +91,7 @@ class PublishRequestorTest extends JUnitSuite {
requestor ! ActorUnregistered(consumer)
assert(latch.await(5000, TimeUnit.MILLISECONDS))
assert((publisher !! GetRetainedMessage) ===
Some(ConsumerUnregistered(consumer, "mock:test", consumer.uuid)))
Some(ConsumerUnregistered(consumer, consumer.actor.asInstanceOf[Consumer])))
}
}