Producer trait for producing messages to Camel endpoints (sync/async, oneway/twoway), Immutable representation of Camel message, consumer/producer examples, refactorings/improvements/cleanups.

This commit is contained in:
Martin Krasser 2010-03-05 19:38:23 +01:00
parent 3b62a7a658
commit c0b41379e6
16 changed files with 822 additions and 185 deletions

View file

@ -0,0 +1,97 @@
/**
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.camel
import org.apache.camel.{ProducerTemplate, CamelContext}
import org.apache.camel.impl.DefaultCamelContext
import se.scalablesolutions.akka.util.Logging
/**
* Defines the lifecycle of a CamelContext. Allowed state transitions are
* init -> start -> stop -> init -> ... etc.
*
* @author Martin Krasser
*/
trait CamelContextLifecycle extends Logging {
// TODO: enforce correct state transitions
// valid: init -> start -> stop -> init ...
private var _context: CamelContext = _
private var _template: ProducerTemplate = _
private var _initialized = false
private var _started = false
/**
* Returns the managed CamelContext.
*/
protected def context: CamelContext = _context
/**
* Returns the managed ProducerTemplate.
*/
protected def template: ProducerTemplate = _template
/**
* Sets the managed CamelContext.
*/
protected def context_= (context: CamelContext) { _context = context }
/**
* Sets the managed ProducerTemplate.
*/
protected def template_= (template: ProducerTemplate) { _template = template }
def initialized = _initialized
def started = _started
/**
* Starts the CamelContext and ProducerTemplate.
*/
def start() {
context.start
template.start
_started = true
log.info("Camel context started")
}
/**
* Stops the CamelContext and ProducerTemplate.
*/
def stop() {
template.stop
context.stop
_initialized = false
_started = false
log.info("Camel context stopped")
}
/**
* Initializes this lifecycle object with the a DefaultCamelContext.
*/
def init() {
init(new DefaultCamelContext)
}
/**
* Initializes this lifecycle object with the given CamelContext.
*/
def init(context: CamelContext) {
this.context = context
this.template = context.createProducerTemplate
_initialized = true
log.info("Camel context initialized")
}
}
/**
* Makes a global CamelContext and ProducerTemplate accessible to applications. The lifecycle
* of these objects is managed by se.scalablesolutions.akka.camel.service.CamelService.
*/
object CamelContextManager extends CamelContextLifecycle {
override def context: CamelContext = super.context
override def template: ProducerTemplate = super.template
}

View file

@ -7,7 +7,7 @@ package se.scalablesolutions.akka.camel
import se.scalablesolutions.akka.actor.Actor
/**
* Mixed in by Actor subclasses to be Camel endpoint consumers.
* Mixed in by Actor implementations that consume message from Camel endpoints.
*
* @author Martin Krasser
*/

View file

@ -4,58 +4,241 @@
package se.scalablesolutions.akka.camel
import org.apache.camel.{Message => CamelMessage}
import org.apache.camel.impl.DefaultCamelContext
import org.apache.camel.{Exchange, Message => CamelMessage}
import org.apache.camel.util.ExchangeHelper
import scala.collection.jcl.{Map => MapWrapper}
/**
* An immutable representation of a Camel message. Actor classes that mix in
* se.scalablesolutions.akka.camel.Producer or
* se.scalablesolutions.akka.camel.Consumer use this message type for communication.
*
* @author Martin Krasser
*/
class Message(val body: Any, val headers: Map[String, Any]) {
case class Message(val body: Any, val headers: Map[String, Any]) {
/**
* Creates a message with a body and an empty header map.
*/
def this(body: Any) = this(body, Map.empty)
def bodyAs[T](clazz: Class[T]): T = Message.converter.mandatoryConvertTo[T](clazz, body)
/**
* Returns the body of the message converted to the type given by the <code>clazz</code>
* argument. Conversion is done using Camel's type converter. The type converter is obtained
* from the CamelContext managed by CamelContextManager. Applications have to ensure proper
* initialization of CamelContextManager.
*
* @see CamelContextManager.
*/
def bodyAs[T](clazz: Class[T]): T = {
CamelContextManager.context.getTypeConverter.mandatoryConvertTo[T](clazz, body)
}
/**
* Returns those headers from this message whose name is contained in <code>names</code>.
*/
def headers(names: Set[String]): Map[String, Any] = {
headers.filter(names contains _._1)
}
/**
* Creates a Message with a new <code>body</code> using a <code>transformer</code> function.
*/
def transformBody[A](transformer: A => Any): Message = setBody(transformer(body.asInstanceOf[A]))
/**
* Creates a Message with a new <code>body</code> converted to type <code>clazz</code>.
*
* @see Message#bodyAs(Class)
*/
def setBodyAs[T](clazz: Class[T]): Message = setBody(bodyAs(clazz))
/**
* Creates a Message with a new <code>body</code>.
*/
def setBody(body: Any) = new Message(body, this.headers)
/**
* Creates a new Message with new <code>headers</code>.
*/
def setHeaders(headers: Map[String, Any]) = new Message(this.body, headers)
/**
* Creates a new Message with the <code>headers</code> argument added to the existing headers.
*/
def addHeaders(headers: Map[String, Any]) = new Message(this.body, this.headers ++ headers)
/**
* Creates a new Message with the <code>header</code> argument added to the existing headers.
*/
def addHeader(header: (String, Any)) = new Message(this.body, this.headers + header)
/**
* Creates a new Message where the header with name <code>headerName</code> is removed from
* the existing headers.
*/
def removeHeader(headerName: String) = new Message(this.body, this.headers - headerName)
}
/**
* Companion object of Message class.
*
* @author Martin Krasser
*/
object Message {
/**
* Message header to correlate request with response messages. Applications that send
* messages to a Producer actor may want to set this header on the request message
* so that it can be correlated with an asynchronous response. Messages send to Consumer
* actors have this header already set.
*/
val MessageExchangeId = "MessageExchangeId"
val converter = new DefaultCamelContext().getTypeConverter
/**
* Creates a new Message with <code>body</code> as message body and an empty header map.
*/
def apply(body: Any) = new Message(body)
def apply(body: Any, headers: Map[String, Any]) = new Message(body, headers)
/**
* Creates a canonical form of the given message <code>msg</code>. If <code>msg</code> of type
* Message then <code>msg</code> is returned, otherwise <code>msg</code> is set as body of a
* newly created Message object.
*/
def canonicalize(msg: Any) = msg match {
case mobj: Message => mobj
case body => new Message(body)
}
}
def apply(cm: CamelMessage) =
new Message(cm.getBody, Map.empty ++ MapWrapper[String, AnyRef](cm.getHeaders).elements)
/**
* An immutable representation of a failed Camel exchange. It contains the failure cause
* obtained from Exchange.getException and the headers from either the Exchange.getIn
* message or Exchange.getOut message, depending on the exchange pattern.
*
* @author Martin Krasser
*/
case class Failure(val cause: Throwable, val headers: Map[String, Any])
/**
* Adapter for converting an org.apache.camel.Exchange to and from Message and Failure objects.
*
* @author Martin Krasser
*/
class CamelExchangeAdapter(exchange: Exchange) {
import CamelMessageConversion.toMessageAdapter
/**
* Sets Exchange.getIn from the given Message object.
*/
def fromRequestMessage(msg: Message): Exchange = { requestMessage.fromMessage(msg); exchange }
/**
* Depending on the exchange pattern, sets Exchange.getIn or Exchange.getOut from the given
* Message object. If the exchange is out-capable then the Exchange.getOut is set, otherwise
* Exchange.getIn.
*/
def fromResponseMessage(msg: Message): Exchange = { responseMessage.fromMessage(msg); exchange }
/**
* Creates a Message object from Exchange.getIn.
*/
def toRequestMessage: Message = toRequestMessage(Map.empty)
/**
* Depending on the exchange pattern, creates a Message object from Exchange.getIn or Exchange.getOut.
* If the exchange is out-capable then the Exchange.getOut is set, otherwise Exchange.getIn.
*/
def toResponseMessage: Message = toResponseMessage(Map.empty)
/**
* Creates a Failure object from the adapted Exchange.
*
* @see Failure
*/
def toFailureMessage: Failure = toFailureMessage(Map.empty)
/**
* Creates a Message object from Exchange.getIn.
*
* @param headers additional headers to set on the created Message in addition to those
* in the Camel message.
*/
def toRequestMessage(headers: Map[String, Any]): Message = requestMessage.toMessage(headers)
/**
* Depending on the exchange pattern, creates a Message object from Exchange.getIn or Exchange.getOut.
* If the exchange is out-capable then the Exchange.getOut is set, otherwise Exchange.getIn.
*
* @param headers additional headers to set on the created Message in addition to those
* in the Camel message.
*/
def toResponseMessage(headers: Map[String, Any]): Message = responseMessage.toMessage(headers)
/**
* Creates a Failure object from the adapted Exchange.
*
* @param headers additional headers to set on the created Message in addition to those
* in the Camel message.
*
* @see Failure
*/
def toFailureMessage(headers: Map[String, Any]): Failure = Failure(exchange.getException, headers ++ responseMessage.toMessage.headers)
private def requestMessage = exchange.getIn
private def responseMessage = ExchangeHelper.getResultMessage(exchange)
}
/**
* @author Martin Krasser
* Adapter for converting an org.apache.camel.Message to and from Message objects.
*
* @author Martin Krasser
*/
class CamelMessageWrapper(val cm: CamelMessage) {
def from(m: Message): CamelMessage = {
class CamelMessageAdapter(val cm: CamelMessage) {
/**
* Set the adapted Camel message from the given Message object.
*/
def fromMessage(m: Message): CamelMessage = {
cm.setBody(m.body)
for (h <- m.headers) {
cm.getHeaders.put(h._1, h._2.asInstanceOf[AnyRef])
}
for (h <- m.headers) cm.getHeaders.put(h._1, h._2.asInstanceOf[AnyRef])
cm
}
/**
* Creates a new Message object from the adapted Camel message.
*/
def toMessage: Message = toMessage(Map.empty)
/**
* Creates a new Message object from the adapted Camel message.
*
* @param headers additional headers to set on the created Message in addition to those
* in the Camel message.
*/
def toMessage(headers: Map[String, Any]): Message = {
Message(cm.getBody, cmHeaders(headers, cm))
}
private def cmHeaders(headers: Map[String, Any], cm: CamelMessage) = {
headers ++ MapWrapper[String, AnyRef](cm.getHeaders).elements
}
}
/**
* @author Martin Krasser
* Defines conversion methods to CamelExchangeAdapter and CamelMessageAdapter. Imported by applications
* that implicitly want to use conversion methods of CamelExchangeAdapter and CamelMessageAdapter.
*/
object CamelMessageWrapper {
implicit def wrapCamelMessage(cm: CamelMessage): CamelMessageWrapper = new CamelMessageWrapper(cm)
object CamelMessageConversion {
/**
* Creates an CamelExchangeAdapter for the given Camel exchange.
*/
implicit def toExchangeAdapter(ce: Exchange): CamelExchangeAdapter = new CamelExchangeAdapter(ce)
/**
* Creates an CamelMessageAdapter for the given Camel message.
*/
implicit def toMessageAdapter(cm: CamelMessage): CamelMessageAdapter = new CamelMessageAdapter(cm)
}

View file

@ -0,0 +1,199 @@
/**
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.camel
import CamelMessageConversion.toExchangeAdapter
import org.apache.camel.{Processor, ExchangePattern, Exchange, ProducerTemplate}
import org.apache.camel.impl.DefaultExchange
import org.apache.camel.spi.Synchronization
import se.scalablesolutions.akka.actor.Actor
import se.scalablesolutions.akka.dispatch.CompletableFutureResult
import se.scalablesolutions.akka.util.Logging
/**
* Mixed in by Actor implementations that produce messages to Camel endpoints.
*
* @author Martin Krasser
*/
trait Producer {
self: Actor =>
/**
* If set to true (default), communication with the Camel endpoint is done via the Camel
* <a href="http://camel.apache.org/async.html">Async API</a>. Camel then processes the
* message in a separate thread. If set to false, the actor thread is blocked until Camel
* has finished processing the produced message.
*/
def async: Boolean = true
/**
* If set to false (default), this producer expects a response message from the Camel endpoint.
* If set to true, this producer communicates with the Camel endpoint with an in-only message
* exchange pattern (fire and forget).
*/
def oneway: Boolean = false
/**
* Returns the Camel endpoint URI to produce messages to.
*/
def endpointUri: String
/**
* Returns the names of message headers to copy from a request message to a response message.
* By default only the Message.MessageExchangeId is copied. Applications may override this to
* define an application-specific set of message headers to copy.
*/
def headersToCopy: Set[String] = Set(Message.MessageExchangeId)
/**
* Returns the producer template from the CamelContextManager. Applications either have to ensure
* proper initialization of CamelContextManager or override this method.
*
* @see CamelContextManager.
*/
protected def template: ProducerTemplate = CamelContextManager.template
/**
* Initiates a one-way (in-only) message exchange to the Camel endpoint given by
* <code>endpointUri</code>. This method blocks until Camel finishes processing
* the message exchange.
*
* @param msg: the message to produce. The message is converted to its canonical
* representation via Message.canonicalize.
*/
protected def produceOneway(msg: Any): Unit = {
template.send(endpointUri, createInOnlyExchange.fromRequestMessage(Message.canonicalize(msg)))
}
/**
* Initiates a one-way (in-only) message exchange to the Camel endpoint given by
* <code>endpointUri</code>. This method triggers asynchronous processing of the
* message exchange by Camel.
*
* @param msg: the message to produce. The message is converted to its canonical
* representation via Message.canonicalize.
*/
protected def produceOnewayAsync(msg: Any): Unit = {
template.asyncSend(endpointUri, createInOnlyExchange.fromRequestMessage(Message.canonicalize(msg)))
}
/**
* Initiates a two-way (in-out) message exchange to the Camel endpoint given by
* <code>endpointUri</code>. This method blocks until Camel finishes processing
* the message exchange.
*
* @param msg: the message to produce. The message is converted to its canonical
* representation via Message.canonicalize.
* @return either a response Message or a Failure object.
*/
protected def produce(msg: Any): Any = {
val cmsg = Message.canonicalize(msg)
val requestProcessor = new Processor() {
def process(exchange: Exchange) = exchange.fromRequestMessage(cmsg)
}
val result = template.request(endpointUri, requestProcessor)
if (result.isFailed)
result.toFailureMessage(cmsg.headers(headersToCopy))
else
result.toResponseMessage(cmsg.headers(headersToCopy))
}
/**
* Initiates a two-way (in-out) message exchange to the Camel endpoint given by
* <code>endpointUri</code>. This method triggers asynchronous processing of the
* message exchange by Camel. The response message is returned asynchronously to
* the original sender (or sender future).
*
* @param msg: the message to produce. The message is converted to its canonical
* representation via Message.canonicalize.
* @return either a response Message or a Failure object.
* @see ProducerResponseSender
*/
protected def produceAsync(msg: Any): Unit = {
val cmsg = Message.canonicalize(msg)
val sync = new ProducerResponseSender(cmsg.headers(headersToCopy), this.sender, this.senderFuture, this)
template.asyncCallback(endpointUri, createInOutExchange.fromRequestMessage(cmsg), sync)
}
/**
* Default implementation for Actor.receive. Implementors may choose to
* <code>def receive = produce</code>. This partial function calls one of
* the protected produce methods depending on the return values of
* <code>oneway</code> and <code>async</code>.
*/
protected def produce: PartialFunction[Any, Unit] = {
case msg => {
if ( oneway && !async) produceOneway(msg)
else if ( oneway && async) produceOnewayAsync(msg)
else if (!oneway && !async) reply(produce(msg))
else /*(!oneway && async)*/ produceAsync(msg)
}
}
/**
* Creates a new in-only Exchange.
*/
protected def createInOnlyExchange: Exchange = createExchange(ExchangePattern.InOnly)
/**
* Creates a new in-out Exchange.
*/
protected def createInOutExchange: Exchange = createExchange(ExchangePattern.InOut)
/**
* Creates a new Exchange with given pattern from the CamelContext managed by
* CamelContextManager. Applications either have to ensure proper initialization
* of CamelContextManager or override this method.
*
* @see CamelContextManager.
*/
protected def createExchange(pattern: ExchangePattern): Exchange = {
new DefaultExchange(CamelContextManager.context, pattern)
}
}
/**
* Synchronization object that sends responses asynchronously to initial senders. This
* class is used by Producer for asynchronous two-way messaging with a Camel endpoint.
*
* @author Martin Krasser
*/
class ProducerResponseSender(
headers: Map[String, Any],
sender: Option[Actor],
senderFuture: Option[CompletableFutureResult],
producer: Actor) extends Synchronization with Logging {
implicit val producerActor = Some(producer) // the response sender
/**
* Replies a Failure message, created from the given exchange, to <code>sender</code> (or
* <code>senderFuture</code> if applicable).
*/
def onFailure(exchange: Exchange) = {
reply(exchange.toFailureMessage(headers))
}
/**
* Replies a response Message, created from the given exchange, to <code>sender</code> (or
* <code>senderFuture</code> if applicable).
*/
def onComplete(exchange: Exchange) = {
reply(exchange.toResponseMessage(headers))
}
private def reply(message: Any) = {
sender match {
case Some(actor) => actor ! message
case None => senderFuture match {
case Some(future) => future.completeWithResult(message)
case None => log.warning("no destination for sending response")
}
}
}
}

View file

@ -11,10 +11,10 @@ import org.apache.camel.{Exchange, Consumer, Processor}
import org.apache.camel.impl.{DefaultProducer, DefaultEndpoint, DefaultComponent}
import se.scalablesolutions.akka.actor.{ActorRegistry, Actor}
import se.scalablesolutions.akka.camel.{CamelMessageWrapper, Message}
import se.scalablesolutions.akka.camel.{CamelMessageConversion, Message}
/**
* Camel component for interacting with actors.
* Camel component for sending messages to and receiving replies from actors.
*
* @see se.scalablesolutions.akka.camel.component.ActorEndpoint
* @see se.scalablesolutions.akka.camel.component.ActorProducer
@ -22,7 +22,6 @@ import se.scalablesolutions.akka.camel.{CamelMessageWrapper, Message}
* @author Martin Krasser
*/
class ActorComponent extends DefaultComponent {
def createEndpoint(uri: String, remaining: String, parameters: JavaMap[String, Object]): ActorEndpoint = {
val idAndUuid = idAndUuidPair(remaining)
new ActorEndpoint(uri, this, idAndUuid._1, idAndUuid._2)
@ -37,12 +36,12 @@ class ActorComponent extends DefaultComponent {
"invalid path format: %s - should be <actorid> or id:<actorid> or uuid:<actoruuid>" format remaining)
}
}
}
/**
* Camel endpoint for interacting with actors. An actor can be addressed by its
* <code>Actor.getId</code> or its <code>Actor.uuid</code> combination. Supported URI formats are
* Camel endpoint for referencing an actor. The actor reference is given by the endpoint URI.
* An actor can be referenced by its <code>Actor.getId</code> or its <code>Actor.uuid</code>.
* Supported endpoint URI formats are
* <code>actor:&lt;actorid&gt;</code>,
* <code>actor:id:&lt;actorid&gt;</code> and
* <code>actor:uuid:&lt;actoruuid&gt;</code>.
@ -53,25 +52,27 @@ class ActorComponent extends DefaultComponent {
* @author Martin Krasser
*/
class ActorEndpoint(uri: String, comp: ActorComponent, val id: Option[String], val uuid: Option[String]) extends DefaultEndpoint(uri, comp) {
/**
* @throws UnsupportedOperationException
*/
def createConsumer(processor: Processor): Consumer =
throw new UnsupportedOperationException("actor consumer not supported yet")
/**
* Creates a new ActorProducer instance initialized with this endpoint.
*/
def createProducer: ActorProducer = new ActorProducer(this)
/**
* Returns true.
*/
def isSingleton: Boolean = true
}
/**
* Sends the in-message of an exchange to an actor. If the exchange pattern is out-capable,
* the producer waits for a reply (using the !! operator), otherwise the ! operator is used
* for sending the message. Asynchronous communication is not implemented yet but will be
* added for Camel components that support the Camel Async API (like the jetty component that
* makes use of Jetty continuations).
* for sending the message.
*
* @see se.scalablesolutions.akka.camel.component.ActorComponent
* @see se.scalablesolutions.akka.camel.component.ActorEndpoint
@ -79,9 +80,17 @@ class ActorEndpoint(uri: String, comp: ActorComponent, val id: Option[String], v
* @author Martin Krasser
*/
class ActorProducer(val ep: ActorEndpoint) extends DefaultProducer(ep) {
import CamelMessageConversion.toExchangeAdapter
implicit val sender = Some(new Sender)
implicit val sender = None
/**
* Depending on the exchange pattern, this method either calls processInOut or
* processInOnly for interacting with an actor. This methods looks up the actor
* from the ActorRegistry according to this producer's endpoint URI.
*
* @param exchange represents the message exchange with the actor.
*/
def process(exchange: Exchange) {
val actor = target getOrElse (throw new ActorNotRegisteredException(ep.getEndpointUri))
if (exchange.getPattern.isOutCapable)
@ -90,40 +99,29 @@ class ActorProducer(val ep: ActorEndpoint) extends DefaultProducer(ep) {
processInOnly(exchange, actor)
}
override def start {
super.start
sender.get.start
}
override def stop {
sender.get.stop
super.stop
}
protected def receive = {
throw new UnsupportedOperationException
}
/**
* Send the exchange in-message to the given actor using the ! operator. The message
* send to the actor is of type se.scalablesolutions.akka.camel.Message.
*/
protected def processInOnly(exchange: Exchange, actor: Actor) {
actor ! Message(exchange.getIn)
actor ! exchange.toRequestMessage(Map(Message.MessageExchangeId -> exchange.getExchangeId))
}
/**
* Send the exchange in-message to the given actor using the !! operator. The exchange
* out-message is populated from the actor's reply message. The message sent to the
* actor is of type se.scalablesolutions.akka.camel.Message.
*/
protected def processInOut(exchange: Exchange, actor: Actor) {
import CamelMessageWrapper._
// TODO: make timeout configurable
// TODO: support asynchronous communication
// - jetty component: jetty continuations
// - file component: completion callbacks
val result: Any = actor !! Message(exchange.getIn)
val result: Any = actor !! exchange.toRequestMessage(Map(Message.MessageExchangeId -> exchange.getExchangeId))
result match {
case Some(m:Message) => {
exchange.getOut.from(m)
}
case Some(body) => {
exchange.getOut.setBody(body)
case Some(msg) => exchange.fromResponseMessage(Message.canonicalize(msg))
case None => {
// TODO: handle timeout properly
// TODO: make timeout configurable
}
}
}
@ -140,33 +138,14 @@ class ActorProducer(val ep: ActorEndpoint) extends DefaultProducer(ep) {
}
private def targetByUuid(uuid: String) = ActorRegistry.actorFor(uuid)
}
/**
* Generic message sender used by ActorProducer.
*
* @author Martin Krasser
*/
private[component] class Sender extends Actor {
/**
* Ignores any message.
*/
protected def receive = {
case _ => { /* ignore any reply */ }
}
}
/**
* Thrown to indicate that an actor referenced by an endpoint URI cannot be
* Thrown to indicate that an actor referenced by an endpoint URI cannot be
* found in the ActorRegistry.
*
* @author Martin Krasser
*/
class ActorNotRegisteredException(uri: String) extends RuntimeException {
override def getMessage = "%s not registered" format uri
}

View file

@ -1,23 +0,0 @@
/**
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.camel.service
import org.apache.camel.CamelContext
import org.apache.camel.impl.DefaultCamelContext
/**
* Manages the CamelContext used by CamelService.
*
* @author Martin Krasser
*/
object CamelContextManager {
/**
* The CamelContext used by CamelService. Can be modified by applications prior to
* loading the CamelService.
*/
var context: CamelContext = new DefaultCamelContext
}

View file

@ -10,11 +10,15 @@ import org.apache.camel.builder.RouteBuilder
import se.scalablesolutions.akka.actor.{Actor, ActorRegistry}
import se.scalablesolutions.akka.annotation.consume
import se.scalablesolutions.akka.camel.Consumer
import se.scalablesolutions.akka.util.{Bootable, Logging}
import se.scalablesolutions.akka.camel.{CamelContextManager, Consumer}
/**
* Started by the Kernel to expose actors as Camel endpoints.
* Started by the Kernel to expose certain actors as Camel endpoints. It uses
* se.scalablesolutions.akka.camel.CamelContextManage to create and manage the
* lifecycle of a global CamelContext. This class further uses the
* se.scalablesolutions.akka.camel.service.CamelServiceRouteBuilder to implement
* routes from Camel endpoints to actors.
*
* @see CamelRouteBuilder
*
@ -22,42 +26,35 @@ import se.scalablesolutions.akka.util.{Bootable, Logging}
*/
trait CamelService extends Bootable with Logging {
import CamelContextManager.context
import CamelContextManager._
abstract override def onLoad = {
super.onLoad
context.addRoutes(new CamelRouteBuilder)
if (!initialized) init()
context.addRoutes(new CamelServiceRouteBuilder)
context.setStreamCaching(true)
context.start
log.info("Camel context started")
start()
}
abstract override def onUnload = {
stop()
super.onUnload
context.stop
log.info("Camel context stopped")
}
}
/**
* Generic route builder that searches the registry for actors that are
* either annotated with @se.scalablesolutions.akka.annotation.consume or
* mixed in se.scalablesolutions.akka.camel.Consumer and exposes them
* as Camel endpoints.
* Implements routes from Camel endpoints to actors. It searches the registry for actors
* that are either annotated with @se.scalablesolutions.akka.annotation.consume or mix in
* se.scalablesolutions.akka.camel.Consumer and exposes them as Camel endpoints.
*
* @author Martin Krasser
*/
class CamelRouteBuilder extends RouteBuilder with Logging {
class CamelServiceRouteBuilder extends RouteBuilder with Logging {
def configure = {
val actors = ActorRegistry.actors
//
// TODO: resolve/clarify issues with ActorRegistry
// - multiple registration with same id/uuid possible
//
// TODO: avoid redundant registrations
actors.filter(isConsumeAnnotated _).foreach { actor: Actor =>
val fromUri = actor.getClass.getAnnotation(classOf[consume]).value()

View file

@ -1,24 +1,79 @@
package se.scalablesolutions.akka.camel.service
package se.scalablesolutions.akka.camel
import java.io.InputStream
import org.apache.camel.NoTypeConversionAvailableException
import org.junit.Assert._
import org.junit.Test
import org.scalatest.junit.JUnitSuite
import se.scalablesolutions.akka.camel.Message
import org.junit.Test
class MessageTest extends JUnitSuite {
//
// TODO: extend/rewrite unit tests
// These tests currently only ensure proper functioning of basic features.
//
@Test def shouldConvertDoubleBodyToString = {
assertEquals("1.4", new Message(1.4, null).bodyAs(classOf[String]))
CamelContextManager.init()
assertEquals("1.4", Message(1.4, null).bodyAs(classOf[String]))
}
@Test def shouldThrowExceptionWhenConvertingDoubleBodyToInputStream {
CamelContextManager.init()
intercept[NoTypeConversionAvailableException] {
new Message(1.4, null).bodyAs(classOf[InputStream])
Message(1.4, null).bodyAs(classOf[InputStream])
}
}
@Test def shouldReturnSubsetOfHeaders = {
val message = Message("test" , Map("A" -> "1", "B" -> "2"))
assertEquals(Map("B" -> "2"), message.headers(Set("B")))
}
@Test def shouldTransformBodyAndPreserveHeaders = {
assertEquals(
Message("ab", Map("A" -> "1")),
Message("a" , Map("A" -> "1")).transformBody[String](body => body + "b"))
}
@Test def shouldConvertBodyAndPreserveHeaders = {
CamelContextManager.init()
assertEquals(
Message("1.4", Map("A" -> "1")),
Message(1.4 , Map("A" -> "1")).setBodyAs(classOf[String]))
}
@Test def shouldSetBodyAndPreserveHeaders = {
assertEquals(
Message("test2" , Map("A" -> "1")),
Message("test1" , Map("A" -> "1")).setBody("test2"))
}
@Test def shouldSetHeadersAndPreserveBody = {
assertEquals(
Message("test1" , Map("C" -> "3")),
Message("test1" , Map("A" -> "1")).setHeaders(Map("C" -> "3")))
}
@Test def shouldAddHeaderAndPreserveBodyAndHeaders = {
assertEquals(
Message("test1" , Map("A" -> "1", "B" -> "2")),
Message("test1" , Map("A" -> "1")).addHeader("B" -> "2"))
}
@Test def shouldAddHeadersAndPreserveBodyAndHeaders = {
assertEquals(
Message("test1" , Map("A" -> "1", "B" -> "2")),
Message("test1" , Map("A" -> "1")).addHeaders(Map("B" -> "2")))
}
@Test def shouldRemoveHeadersAndPreserveBodyAndRemainingHeaders = {
assertEquals(
Message("test1" , Map("A" -> "1")),
Message("test1" , Map("A" -> "1", "B" -> "2")).removeHeader("B"))
}
}

View file

@ -0,0 +1,108 @@
package se.scalablesolutions.akka.camel
import org.apache.camel.{Exchange, Processor}
import org.apache.camel.builder.RouteBuilder
import org.apache.camel.component.mock.MockEndpoint
import org.junit.Assert._
import org.junit.{Test, After, Before}
import org.scalatest.junit.JUnitSuite
import se.scalablesolutions.akka.actor.Actor
class ProducerTest extends JUnitSuite {
//
// TODO: extend/rewrite unit tests
// These tests currently only ensure proper functioning of basic features.
//
import CamelContextManager._
var mock: MockEndpoint = _
@Before def setUp = {
init()
context.addRoutes(new TestRouteBuilder)
start()
mock = context.getEndpoint("mock:mock", classOf[MockEndpoint])
}
@After def tearDown = {
stop()
}
//
// TODO: test replies to messages sent with ! (bang)
//
@Test def shouldProduceMessageSyncAndReceiveResponse = {
val producer = new TestProducer("direct:input2", false, false).start
val message = Message("test1", Map(Message.MessageExchangeId -> "123"))
val expected = Message("Hello test1", Map(Message.MessageExchangeId -> "123"))
assertEquals(expected, producer !! message get)
producer.stop
}
@Test def shouldProduceMessageSyncAndReceiveFailure = {
val producer = new TestProducer("direct:input2", false, false).start
val message = Message("fail", Map(Message.MessageExchangeId -> "123"))
val result = producer.!![Failure](message).get
assertEquals("failure", result.cause.getMessage)
assertEquals(Map(Message.MessageExchangeId -> "123"), result.headers)
producer.stop
}
@Test def shouldProduceMessageAsyncAndReceiveResponse = {
val producer = new TestProducer("direct:input2", true, false).start
val message = Message("test2", Map(Message.MessageExchangeId -> "124"))
val expected = Message("Hello test2", Map(Message.MessageExchangeId -> "124"))
assertEquals(expected, producer !! message get)
producer.stop
}
@Test def shouldProduceMessageAsyncAndReceiveFailure = {
val producer = new TestProducer("direct:input2", true, false).start
val message = Message("fail", Map(Message.MessageExchangeId -> "124"))
val result = producer.!![Failure](message).get
assertEquals("failure", result.cause.getMessage)
assertEquals(Map(Message.MessageExchangeId -> "124"), result.headers)
producer.stop
}
@Test def shouldProduceMessageSyncWithoutReceivingResponse = {
val producer = new TestProducer("direct:input1", false, true).start
mock.expectedBodiesReceived("test3")
producer.!("test3")(None)
producer.stop
}
@Test def shouldProduceMessageAsyncAndReceiveResponseSync = {
val producer = new TestProducer("direct:input1", true, true).start
mock.expectedBodiesReceived("test4")
producer.!("test4")(None)
producer.stop
}
class TestProducer(uri:String, prodAsync: Boolean, prodOneway: Boolean) extends Actor with Producer {
override def async = prodAsync
override def oneway = prodOneway
def endpointUri = uri
def receive = produce
}
class TestRouteBuilder extends RouteBuilder {
def configure {
from("direct:input1").to("mock:mock")
from("direct:input2").process(new Processor() {
def process(exchange: Exchange) = {
val body = exchange.getIn.getBody
body match {
case "fail" => throw new Exception("failure")
case body => exchange.getOut.setBody("Hello %s" format body)
}
}
})
}
}
}

View file

@ -3,27 +3,24 @@ package se.scalablesolutions.akka.camel.component
import org.junit._
import org.junit.Assert._
import org.scalatest.junit.JUnitSuite
import se.scalablesolutions.akka.actor.Actor
import se.scalablesolutions.akka.camel.Message
import org.apache.camel.{CamelContext, ExchangePattern}
import org.apache.camel.impl.{DefaultExchange, SimpleRegistry, DefaultCamelContext}
import se.scalablesolutions.akka.camel.{CamelContextLifecycle, Message}
/**
* @author Martin Krasser
*/
class ActorComponentTest extends JUnitSuite {
class ActorComponentTest extends JUnitSuite with CamelContextLifecycle {
val context = new DefaultCamelContext(new SimpleRegistry)
val template = context.createProducerTemplate
//
// TODO: extend/rewrite unit tests
// These tests currently only ensure proper functioning of basic features.
//
@Before def setUp = {
context.start
template.start
init()
start()
}
@After def tearDown = {
template.stop
context.stop
stop()
}
@Test def shouldReceiveResponseFromActorReferencedById = {

View file

@ -1,19 +1,21 @@
package se.scalablesolutions.akka.camel.component
import org.apache.camel.{CamelContext, ExchangePattern}
import org.apache.camel.impl.{DefaultCamelContext, DefaultExchange}
import org.junit.Assert._
import org.junit.Test
import org.scalatest.junit.JUnitSuite
import se.scalablesolutions.akka.actor.Actor
import se.scalablesolutions.akka.camel.Message
import org.apache.camel.impl.{DefaultCamelContext, DefaultExchange}
/**
* @author Martin Krasser
*/
class ActorProducerTest extends JUnitSuite {
//
// TODO: extend/rewrite unit tests
// These tests currently only ensure proper functioning of basic features.
//
val context = new DefaultCamelContext
val endpoint = context.getEndpoint("actor:%s" format classOf[TestActor].getName)
val producer = endpoint.createProducer

View file

@ -1,68 +1,50 @@
package se.scalablesolutions.akka.camel.service
import org.apache.camel.builder.RouteBuilder
import org.apache.camel.impl.DefaultCamelContext
import org.junit.Assert._
import org.junit.{Before, After, Test}
import org.scalatest.junit.JUnitSuite
import se.scalablesolutions.akka.actor.Actor
import se.scalablesolutions.akka.annotation.consume
import se.scalablesolutions.akka.camel.{Message, Consumer}
import se.scalablesolutions.akka.camel.{CamelContextManager, Consumer, Message}
/**
* @author Martin Krasser
*/
class CamelServiceTest extends JUnitSuite {
class CamelServiceTest extends JUnitSuite with CamelService {
import CamelContextManager.context
//
// TODO: extend/rewrite unit tests
// These tests currently only ensure proper functioning of basic features.
//
context = new DefaultCamelContext
context.addRoutes(new TestBuilder)
import CamelContextManager._
val template = context.createProducerTemplate
var service: CamelService = _
var actor1: Actor = _
var actor2: Actor = _
var actor3: Actor = _
@Before def setUp = {
service = new CamelService {
override def onUnload = super.onUnload
override def onLoad = super.onLoad
}
actor1 = new TestActor1().start
actor2 = new TestActor2().start
actor3 = new TestActor3().start
service.onLoad
template.start
init()
context.addRoutes(new TestRouteBuilder)
onLoad
}
@After def tearDown = {
onUnload
actor1.stop
actor2.stop
actor3.stop
template.stop
service.onUnload
}
@Test def shouldReceiveResponseFromActor1ViaGeneratedRoute = {
val result = template.requestBody("direct:actor1", "Martin")
assertEquals("Hello Martin (actor1)", result)
@Test def shouldReceiveResponseViaGeneratedRoute = {
assertEquals("Hello Martin (actor1)", template.requestBody("direct:actor1", "Martin"))
assertEquals("Hello Martin (actor2)", template.requestBody("direct:actor2", "Martin"))
}
@Test def shouldReceiveResponseFromActor2ViaGeneratedRoute = {
val result = template.requestBody("direct:actor2", "Martin")
assertEquals("Hello Martin (actor2)", result)
}
@Test def shouldReceiveResponseFromActor3ViaCustomRoute = {
val result = template.requestBody("direct:actor3", "Martin")
assertEquals("Hello Tester (actor3)", result)
@Test def shouldReceiveResponseViaCustomRoute = {
assertEquals("Hello Tester (actor3)", template.requestBody("direct:actor3", "Martin"))
}
}
@ -91,7 +73,7 @@ class TestActor3 extends Actor {
}
}
class TestBuilder extends RouteBuilder {
class TestRouteBuilder extends RouteBuilder {
def configure {
val actorUri = "actor:%s" format classOf[TestActor3].getName
from("direct:actor3").transform(constant("Tester")).to("actor:actor3")

View file

@ -1,10 +1,10 @@
package sample.camel
import org.apache.camel.builder.RouteBuilder
import org.apache.camel.impl.DefaultCamelContext
import org.apache.camel.{Exchange, Processor}
import se.scalablesolutions.akka.actor.SupervisorFactory
import se.scalablesolutions.akka.camel.service.CamelContextManager
import se.scalablesolutions.akka.camel.CamelContextManager
import se.scalablesolutions.akka.config.ScalaConfig._
/**
@ -12,10 +12,8 @@ import se.scalablesolutions.akka.config.ScalaConfig._
*/
class Boot {
import CamelContextManager.context
context = new DefaultCamelContext
context.addRoutes(new CustomRouteBuilder)
CamelContextManager.init()
CamelContextManager.context.addRoutes(new CustomRouteBuilder)
val factory = SupervisorFactory(
SupervisorConfig(
@ -24,13 +22,27 @@ class Boot {
Supervise(new Consumer2, LifeCycle(Permanent)) :: Nil))
factory.newInstance.start
val producer = new Producer1
val mediator = new Transformer(producer)
val consumer = new Consumer3(mediator)
producer.start
mediator.start
consumer.start
}
class CustomRouteBuilder extends RouteBuilder {
def configure {
val actorUri = "actor:%s" format classOf[Consumer2].getName
from ("jetty:http://0.0.0.0:8877/camel/test2").to(actorUri)
from("jetty:http://0.0.0.0:8877/camel/test2").to(actorUri)
from("direct:welcome").process(new Processor() {
def process(exchange: Exchange) {
exchange.getOut.setBody("Welcome %s" format exchange.getIn.getBody)
}
})
}
}

View file

@ -0,0 +1,17 @@
package sample.camel
import se.scalablesolutions.akka.actor.Actor
import se.scalablesolutions.akka.camel.{Message, Consumer}
/**
* @author Martin Krasser
*/
class Consumer3(transformer: Actor) extends Actor with Consumer {
def endpointUri = "jetty:http://0.0.0.0:8877/camel/welcome"
def receive = {
case msg: Message => transformer.forward(msg.setBodyAs(classOf[String]))
}
}

View file

@ -0,0 +1,17 @@
package sample.camel
import se.scalablesolutions.akka.actor.Actor
import se.scalablesolutions.akka.camel.Producer
/**
* @author Martin Krasser
*/
class Producer1 extends Actor with Producer {
def endpointUri = "direct:welcome"
override def oneway = false // default
override def async = true // default
protected def receive = produce
}

View file

@ -0,0 +1,15 @@
package sample.camel
import se.scalablesolutions.akka.actor.Actor
import se.scalablesolutions.akka.camel.Message
/**
* @author Martin Krasser
*/
class Transformer(producer: Actor) extends Actor {
protected def receive = {
case msg: Message => producer.forward(msg.transformBody[String]("- %s -" format _))
}
}