diff --git a/akka-camel/src/main/scala/CamelContextLifecycle.scala b/akka-camel/src/main/scala/CamelContextLifecycle.scala new file mode 100644 index 0000000000..68e67a0612 --- /dev/null +++ b/akka-camel/src/main/scala/CamelContextLifecycle.scala @@ -0,0 +1,97 @@ +/** + * Copyright (C) 2009-2010 Scalable Solutions AB + */ + +package se.scalablesolutions.akka.camel + +import org.apache.camel.{ProducerTemplate, CamelContext} +import org.apache.camel.impl.DefaultCamelContext + +import se.scalablesolutions.akka.util.Logging + +/** + * Defines the lifecycle of a CamelContext. Allowed state transitions are + * init -> start -> stop -> init -> ... etc. + * + * @author Martin Krasser + */ +trait CamelContextLifecycle extends Logging { + // TODO: enforce correct state transitions + // valid: init -> start -> stop -> init ... + + private var _context: CamelContext = _ + private var _template: ProducerTemplate = _ + + private var _initialized = false + private var _started = false + + /** + * Returns the managed CamelContext. + */ + protected def context: CamelContext = _context + + /** + * Returns the managed ProducerTemplate. + */ + protected def template: ProducerTemplate = _template + + /** + * Sets the managed CamelContext. + */ + protected def context_= (context: CamelContext) { _context = context } + + /** + * Sets the managed ProducerTemplate. + */ + protected def template_= (template: ProducerTemplate) { _template = template } + + def initialized = _initialized + def started = _started + + /** + * Starts the CamelContext and ProducerTemplate. + */ + def start() { + context.start + template.start + _started = true + log.info("Camel context started") + } + + /** + * Stops the CamelContext and ProducerTemplate. + */ + def stop() { + template.stop + context.stop + _initialized = false + _started = false + log.info("Camel context stopped") + } + + /** + * Initializes this lifecycle object with the a DefaultCamelContext. + */ + def init() { + init(new DefaultCamelContext) + } + + /** + * Initializes this lifecycle object with the given CamelContext. + */ + def init(context: CamelContext) { + this.context = context + this.template = context.createProducerTemplate + _initialized = true + log.info("Camel context initialized") + } +} + +/** + * Makes a global CamelContext and ProducerTemplate accessible to applications. The lifecycle + * of these objects is managed by se.scalablesolutions.akka.camel.service.CamelService. + */ +object CamelContextManager extends CamelContextLifecycle { + override def context: CamelContext = super.context + override def template: ProducerTemplate = super.template +} \ No newline at end of file diff --git a/akka-camel/src/main/scala/Consumer.scala b/akka-camel/src/main/scala/Consumer.scala index 3dbb101292..1a3003d863 100644 --- a/akka-camel/src/main/scala/Consumer.scala +++ b/akka-camel/src/main/scala/Consumer.scala @@ -7,7 +7,7 @@ package se.scalablesolutions.akka.camel import se.scalablesolutions.akka.actor.Actor /** - * Mixed in by Actor subclasses to be Camel endpoint consumers. + * Mixed in by Actor implementations that consume message from Camel endpoints. * * @author Martin Krasser */ diff --git a/akka-camel/src/main/scala/Message.scala b/akka-camel/src/main/scala/Message.scala index 88f810e045..db23868fac 100644 --- a/akka-camel/src/main/scala/Message.scala +++ b/akka-camel/src/main/scala/Message.scala @@ -4,58 +4,241 @@ package se.scalablesolutions.akka.camel -import org.apache.camel.{Message => CamelMessage} -import org.apache.camel.impl.DefaultCamelContext +import org.apache.camel.{Exchange, Message => CamelMessage} +import org.apache.camel.util.ExchangeHelper import scala.collection.jcl.{Map => MapWrapper} /** + * An immutable representation of a Camel message. Actor classes that mix in + * se.scalablesolutions.akka.camel.Producer or + * se.scalablesolutions.akka.camel.Consumer use this message type for communication. + * * @author Martin Krasser */ -class Message(val body: Any, val headers: Map[String, Any]) { - +case class Message(val body: Any, val headers: Map[String, Any]) { + /** + * Creates a message with a body and an empty header map. + */ def this(body: Any) = this(body, Map.empty) - def bodyAs[T](clazz: Class[T]): T = Message.converter.mandatoryConvertTo[T](clazz, body) + /** + * Returns the body of the message converted to the type given by the clazz + * argument. Conversion is done using Camel's type converter. The type converter is obtained + * from the CamelContext managed by CamelContextManager. Applications have to ensure proper + * initialization of CamelContextManager. + * + * @see CamelContextManager. + */ + def bodyAs[T](clazz: Class[T]): T = { + CamelContextManager.context.getTypeConverter.mandatoryConvertTo[T](clazz, body) + } + /** + * Returns those headers from this message whose name is contained in names. + */ + def headers(names: Set[String]): Map[String, Any] = { + headers.filter(names contains _._1) + } + + /** + * Creates a Message with a new body using a transformer function. + */ + def transformBody[A](transformer: A => Any): Message = setBody(transformer(body.asInstanceOf[A])) + + /** + * Creates a Message with a new body converted to type clazz. + * + * @see Message#bodyAs(Class) + */ + def setBodyAs[T](clazz: Class[T]): Message = setBody(bodyAs(clazz)) + + /** + * Creates a Message with a new body. + */ + def setBody(body: Any) = new Message(body, this.headers) + + /** + * Creates a new Message with new headers. + */ + def setHeaders(headers: Map[String, Any]) = new Message(this.body, headers) + + /** + * Creates a new Message with the headers argument added to the existing headers. + */ + def addHeaders(headers: Map[String, Any]) = new Message(this.body, this.headers ++ headers) + + /** + * Creates a new Message with the header argument added to the existing headers. + */ + def addHeader(header: (String, Any)) = new Message(this.body, this.headers + header) + + /** + * Creates a new Message where the header with name headerName is removed from + * the existing headers. + */ + def removeHeader(headerName: String) = new Message(this.body, this.headers - headerName) } /** + * Companion object of Message class. + * * @author Martin Krasser */ object Message { + /** + * Message header to correlate request with response messages. Applications that send + * messages to a Producer actor may want to set this header on the request message + * so that it can be correlated with an asynchronous response. Messages send to Consumer + * actors have this header already set. + */ + val MessageExchangeId = "MessageExchangeId" - val converter = new DefaultCamelContext().getTypeConverter - + /** + * Creates a new Message with body as message body and an empty header map. + */ def apply(body: Any) = new Message(body) - def apply(body: Any, headers: Map[String, Any]) = new Message(body, headers) + /** + * Creates a canonical form of the given message msg. If msg of type + * Message then msg is returned, otherwise msg is set as body of a + * newly created Message object. + */ + def canonicalize(msg: Any) = msg match { + case mobj: Message => mobj + case body => new Message(body) + } +} - def apply(cm: CamelMessage) = - new Message(cm.getBody, Map.empty ++ MapWrapper[String, AnyRef](cm.getHeaders).elements) +/** + * An immutable representation of a failed Camel exchange. It contains the failure cause + * obtained from Exchange.getException and the headers from either the Exchange.getIn + * message or Exchange.getOut message, depending on the exchange pattern. + * + * @author Martin Krasser + */ +case class Failure(val cause: Throwable, val headers: Map[String, Any]) + +/** + * Adapter for converting an org.apache.camel.Exchange to and from Message and Failure objects. + * + * @author Martin Krasser + */ +class CamelExchangeAdapter(exchange: Exchange) { + + import CamelMessageConversion.toMessageAdapter + + /** + * Sets Exchange.getIn from the given Message object. + */ + def fromRequestMessage(msg: Message): Exchange = { requestMessage.fromMessage(msg); exchange } + + /** + * Depending on the exchange pattern, sets Exchange.getIn or Exchange.getOut from the given + * Message object. If the exchange is out-capable then the Exchange.getOut is set, otherwise + * Exchange.getIn. + */ + def fromResponseMessage(msg: Message): Exchange = { responseMessage.fromMessage(msg); exchange } + + /** + * Creates a Message object from Exchange.getIn. + */ + def toRequestMessage: Message = toRequestMessage(Map.empty) + + /** + * Depending on the exchange pattern, creates a Message object from Exchange.getIn or Exchange.getOut. + * If the exchange is out-capable then the Exchange.getOut is set, otherwise Exchange.getIn. + */ + def toResponseMessage: Message = toResponseMessage(Map.empty) + + /** + * Creates a Failure object from the adapted Exchange. + * + * @see Failure + */ + def toFailureMessage: Failure = toFailureMessage(Map.empty) + + /** + * Creates a Message object from Exchange.getIn. + * + * @param headers additional headers to set on the created Message in addition to those + * in the Camel message. + */ + def toRequestMessage(headers: Map[String, Any]): Message = requestMessage.toMessage(headers) + + /** + * Depending on the exchange pattern, creates a Message object from Exchange.getIn or Exchange.getOut. + * If the exchange is out-capable then the Exchange.getOut is set, otherwise Exchange.getIn. + * + * @param headers additional headers to set on the created Message in addition to those + * in the Camel message. + */ + def toResponseMessage(headers: Map[String, Any]): Message = responseMessage.toMessage(headers) + + /** + * Creates a Failure object from the adapted Exchange. + * + * @param headers additional headers to set on the created Message in addition to those + * in the Camel message. + * + * @see Failure + */ + def toFailureMessage(headers: Map[String, Any]): Failure = Failure(exchange.getException, headers ++ responseMessage.toMessage.headers) + + private def requestMessage = exchange.getIn + + private def responseMessage = ExchangeHelper.getResultMessage(exchange) } /** - * @author Martin Krasser + * Adapter for converting an org.apache.camel.Message to and from Message objects. + * + * @author Martin Krasser */ -class CamelMessageWrapper(val cm: CamelMessage) { - - def from(m: Message): CamelMessage = { +class CamelMessageAdapter(val cm: CamelMessage) { + /** + * Set the adapted Camel message from the given Message object. + */ + def fromMessage(m: Message): CamelMessage = { cm.setBody(m.body) - for (h <- m.headers) { - cm.getHeaders.put(h._1, h._2.asInstanceOf[AnyRef]) - } + for (h <- m.headers) cm.getHeaders.put(h._1, h._2.asInstanceOf[AnyRef]) cm } + /** + * Creates a new Message object from the adapted Camel message. + */ + def toMessage: Message = toMessage(Map.empty) + + /** + * Creates a new Message object from the adapted Camel message. + * + * @param headers additional headers to set on the created Message in addition to those + * in the Camel message. + */ + def toMessage(headers: Map[String, Any]): Message = { + Message(cm.getBody, cmHeaders(headers, cm)) + } + + private def cmHeaders(headers: Map[String, Any], cm: CamelMessage) = { + headers ++ MapWrapper[String, AnyRef](cm.getHeaders).elements + } + } /** - * @author Martin Krasser + * Defines conversion methods to CamelExchangeAdapter and CamelMessageAdapter. Imported by applications + * that implicitly want to use conversion methods of CamelExchangeAdapter and CamelMessageAdapter. */ -object CamelMessageWrapper { - - implicit def wrapCamelMessage(cm: CamelMessage): CamelMessageWrapper = new CamelMessageWrapper(cm) +object CamelMessageConversion { + /** + * Creates an CamelExchangeAdapter for the given Camel exchange. + */ + implicit def toExchangeAdapter(ce: Exchange): CamelExchangeAdapter = new CamelExchangeAdapter(ce) + /** + * Creates an CamelMessageAdapter for the given Camel message. + */ + implicit def toMessageAdapter(cm: CamelMessage): CamelMessageAdapter = new CamelMessageAdapter(cm) } \ No newline at end of file diff --git a/akka-camel/src/main/scala/Producer.scala b/akka-camel/src/main/scala/Producer.scala new file mode 100644 index 0000000000..326f208cfc --- /dev/null +++ b/akka-camel/src/main/scala/Producer.scala @@ -0,0 +1,199 @@ +/** + * Copyright (C) 2009-2010 Scalable Solutions AB + */ + +package se.scalablesolutions.akka.camel + +import CamelMessageConversion.toExchangeAdapter + +import org.apache.camel.{Processor, ExchangePattern, Exchange, ProducerTemplate} +import org.apache.camel.impl.DefaultExchange +import org.apache.camel.spi.Synchronization + +import se.scalablesolutions.akka.actor.Actor +import se.scalablesolutions.akka.dispatch.CompletableFutureResult +import se.scalablesolutions.akka.util.Logging + +/** + * Mixed in by Actor implementations that produce messages to Camel endpoints. + * + * @author Martin Krasser + */ +trait Producer { + + self: Actor => + + /** + * If set to true (default), communication with the Camel endpoint is done via the Camel + * Async API. Camel then processes the + * message in a separate thread. If set to false, the actor thread is blocked until Camel + * has finished processing the produced message. + */ + def async: Boolean = true + + /** + * If set to false (default), this producer expects a response message from the Camel endpoint. + * If set to true, this producer communicates with the Camel endpoint with an in-only message + * exchange pattern (fire and forget). + */ + def oneway: Boolean = false + + /** + * Returns the Camel endpoint URI to produce messages to. + */ + def endpointUri: String + + /** + * Returns the names of message headers to copy from a request message to a response message. + * By default only the Message.MessageExchangeId is copied. Applications may override this to + * define an application-specific set of message headers to copy. + */ + def headersToCopy: Set[String] = Set(Message.MessageExchangeId) + + /** + * Returns the producer template from the CamelContextManager. Applications either have to ensure + * proper initialization of CamelContextManager or override this method. + * + * @see CamelContextManager. + */ + protected def template: ProducerTemplate = CamelContextManager.template + + /** + * Initiates a one-way (in-only) message exchange to the Camel endpoint given by + * endpointUri. This method blocks until Camel finishes processing + * the message exchange. + * + * @param msg: the message to produce. The message is converted to its canonical + * representation via Message.canonicalize. + */ + protected def produceOneway(msg: Any): Unit = { + template.send(endpointUri, createInOnlyExchange.fromRequestMessage(Message.canonicalize(msg))) + } + + /** + * Initiates a one-way (in-only) message exchange to the Camel endpoint given by + * endpointUri. This method triggers asynchronous processing of the + * message exchange by Camel. + * + * @param msg: the message to produce. The message is converted to its canonical + * representation via Message.canonicalize. + */ + protected def produceOnewayAsync(msg: Any): Unit = { + template.asyncSend(endpointUri, createInOnlyExchange.fromRequestMessage(Message.canonicalize(msg))) + } + + /** + * Initiates a two-way (in-out) message exchange to the Camel endpoint given by + * endpointUri. This method blocks until Camel finishes processing + * the message exchange. + * + * @param msg: the message to produce. The message is converted to its canonical + * representation via Message.canonicalize. + * @return either a response Message or a Failure object. + */ + protected def produce(msg: Any): Any = { + val cmsg = Message.canonicalize(msg) + val requestProcessor = new Processor() { + def process(exchange: Exchange) = exchange.fromRequestMessage(cmsg) + } + val result = template.request(endpointUri, requestProcessor) + if (result.isFailed) + result.toFailureMessage(cmsg.headers(headersToCopy)) + else + result.toResponseMessage(cmsg.headers(headersToCopy)) + } + + /** + * Initiates a two-way (in-out) message exchange to the Camel endpoint given by + * endpointUri. This method triggers asynchronous processing of the + * message exchange by Camel. The response message is returned asynchronously to + * the original sender (or sender future). + * + * @param msg: the message to produce. The message is converted to its canonical + * representation via Message.canonicalize. + * @return either a response Message or a Failure object. + * @see ProducerResponseSender + */ + protected def produceAsync(msg: Any): Unit = { + val cmsg = Message.canonicalize(msg) + val sync = new ProducerResponseSender(cmsg.headers(headersToCopy), this.sender, this.senderFuture, this) + template.asyncCallback(endpointUri, createInOutExchange.fromRequestMessage(cmsg), sync) + } + + /** + * Default implementation for Actor.receive. Implementors may choose to + * def receive = produce. This partial function calls one of + * the protected produce methods depending on the return values of + * oneway and async. + */ + protected def produce: PartialFunction[Any, Unit] = { + case msg => { + if ( oneway && !async) produceOneway(msg) + else if ( oneway && async) produceOnewayAsync(msg) + else if (!oneway && !async) reply(produce(msg)) + else /*(!oneway && async)*/ produceAsync(msg) + } + } + + /** + * Creates a new in-only Exchange. + */ + protected def createInOnlyExchange: Exchange = createExchange(ExchangePattern.InOnly) + + /** + * Creates a new in-out Exchange. + */ + protected def createInOutExchange: Exchange = createExchange(ExchangePattern.InOut) + + /** + * Creates a new Exchange with given pattern from the CamelContext managed by + * CamelContextManager. Applications either have to ensure proper initialization + * of CamelContextManager or override this method. + * + * @see CamelContextManager. + */ + protected def createExchange(pattern: ExchangePattern): Exchange = { + new DefaultExchange(CamelContextManager.context, pattern) + } +} + +/** + * Synchronization object that sends responses asynchronously to initial senders. This + * class is used by Producer for asynchronous two-way messaging with a Camel endpoint. + * + * @author Martin Krasser + */ +class ProducerResponseSender( + headers: Map[String, Any], + sender: Option[Actor], + senderFuture: Option[CompletableFutureResult], + producer: Actor) extends Synchronization with Logging { + + implicit val producerActor = Some(producer) // the response sender + + /** + * Replies a Failure message, created from the given exchange, to sender (or + * senderFuture if applicable). + */ + def onFailure(exchange: Exchange) = { + reply(exchange.toFailureMessage(headers)) + } + + /** + * Replies a response Message, created from the given exchange, to sender (or + * senderFuture if applicable). + */ + def onComplete(exchange: Exchange) = { + reply(exchange.toResponseMessage(headers)) + } + + private def reply(message: Any) = { + sender match { + case Some(actor) => actor ! message + case None => senderFuture match { + case Some(future) => future.completeWithResult(message) + case None => log.warning("no destination for sending response") + } + } + } +} diff --git a/akka-camel/src/main/scala/component/ActorComponent.scala b/akka-camel/src/main/scala/component/ActorComponent.scala index d59a4261c9..71db14021a 100644 --- a/akka-camel/src/main/scala/component/ActorComponent.scala +++ b/akka-camel/src/main/scala/component/ActorComponent.scala @@ -11,10 +11,10 @@ import org.apache.camel.{Exchange, Consumer, Processor} import org.apache.camel.impl.{DefaultProducer, DefaultEndpoint, DefaultComponent} import se.scalablesolutions.akka.actor.{ActorRegistry, Actor} -import se.scalablesolutions.akka.camel.{CamelMessageWrapper, Message} +import se.scalablesolutions.akka.camel.{CamelMessageConversion, Message} /** - * Camel component for interacting with actors. + * Camel component for sending messages to and receiving replies from actors. * * @see se.scalablesolutions.akka.camel.component.ActorEndpoint * @see se.scalablesolutions.akka.camel.component.ActorProducer @@ -22,7 +22,6 @@ import se.scalablesolutions.akka.camel.{CamelMessageWrapper, Message} * @author Martin Krasser */ class ActorComponent extends DefaultComponent { - def createEndpoint(uri: String, remaining: String, parameters: JavaMap[String, Object]): ActorEndpoint = { val idAndUuid = idAndUuidPair(remaining) new ActorEndpoint(uri, this, idAndUuid._1, idAndUuid._2) @@ -37,12 +36,12 @@ class ActorComponent extends DefaultComponent { "invalid path format: %s - should be or id: or uuid:" format remaining) } } - } /** - * Camel endpoint for interacting with actors. An actor can be addressed by its - * Actor.getId or its Actor.uuid combination. Supported URI formats are + * Camel endpoint for referencing an actor. The actor reference is given by the endpoint URI. + * An actor can be referenced by its Actor.getId or its Actor.uuid. + * Supported endpoint URI formats are * actor:<actorid>, * actor:id:<actorid> and * actor:uuid:<actoruuid>. @@ -53,25 +52,27 @@ class ActorComponent extends DefaultComponent { * @author Martin Krasser */ class ActorEndpoint(uri: String, comp: ActorComponent, val id: Option[String], val uuid: Option[String]) extends DefaultEndpoint(uri, comp) { - /** * @throws UnsupportedOperationException */ def createConsumer(processor: Processor): Consumer = throw new UnsupportedOperationException("actor consumer not supported yet") + /** + * Creates a new ActorProducer instance initialized with this endpoint. + */ def createProducer: ActorProducer = new ActorProducer(this) + /** + * Returns true. + */ def isSingleton: Boolean = true - } /** * Sends the in-message of an exchange to an actor. If the exchange pattern is out-capable, * the producer waits for a reply (using the !! operator), otherwise the ! operator is used - * for sending the message. Asynchronous communication is not implemented yet but will be - * added for Camel components that support the Camel Async API (like the jetty component that - * makes use of Jetty continuations). + * for sending the message. * * @see se.scalablesolutions.akka.camel.component.ActorComponent * @see se.scalablesolutions.akka.camel.component.ActorEndpoint @@ -79,9 +80,17 @@ class ActorEndpoint(uri: String, comp: ActorComponent, val id: Option[String], v * @author Martin Krasser */ class ActorProducer(val ep: ActorEndpoint) extends DefaultProducer(ep) { + import CamelMessageConversion.toExchangeAdapter - implicit val sender = Some(new Sender) + implicit val sender = None + /** + * Depending on the exchange pattern, this method either calls processInOut or + * processInOnly for interacting with an actor. This methods looks up the actor + * from the ActorRegistry according to this producer's endpoint URI. + * + * @param exchange represents the message exchange with the actor. + */ def process(exchange: Exchange) { val actor = target getOrElse (throw new ActorNotRegisteredException(ep.getEndpointUri)) if (exchange.getPattern.isOutCapable) @@ -90,40 +99,29 @@ class ActorProducer(val ep: ActorEndpoint) extends DefaultProducer(ep) { processInOnly(exchange, actor) } - override def start { - super.start - sender.get.start - } - - override def stop { - sender.get.stop - super.stop - } - - protected def receive = { - throw new UnsupportedOperationException - } - + /** + * Send the exchange in-message to the given actor using the ! operator. The message + * send to the actor is of type se.scalablesolutions.akka.camel.Message. + */ protected def processInOnly(exchange: Exchange, actor: Actor) { - actor ! Message(exchange.getIn) + actor ! exchange.toRequestMessage(Map(Message.MessageExchangeId -> exchange.getExchangeId)) } + /** + * Send the exchange in-message to the given actor using the !! operator. The exchange + * out-message is populated from the actor's reply message. The message sent to the + * actor is of type se.scalablesolutions.akka.camel.Message. + */ protected def processInOut(exchange: Exchange, actor: Actor) { - import CamelMessageWrapper._ - - // TODO: make timeout configurable // TODO: support asynchronous communication - // - jetty component: jetty continuations - // - file component: completion callbacks - val result: Any = actor !! Message(exchange.getIn) + val result: Any = actor !! exchange.toRequestMessage(Map(Message.MessageExchangeId -> exchange.getExchangeId)) result match { - case Some(m:Message) => { - exchange.getOut.from(m) - } - case Some(body) => { - exchange.getOut.setBody(body) + case Some(msg) => exchange.fromResponseMessage(Message.canonicalize(msg)) + case None => { + // TODO: handle timeout properly + // TODO: make timeout configurable } } } @@ -140,33 +138,14 @@ class ActorProducer(val ep: ActorEndpoint) extends DefaultProducer(ep) { } private def targetByUuid(uuid: String) = ActorRegistry.actorFor(uuid) - } /** - * Generic message sender used by ActorProducer. - * - * @author Martin Krasser - */ -private[component] class Sender extends Actor { - - /** - * Ignores any message. - */ - protected def receive = { - case _ => { /* ignore any reply */ } - } - -} - -/** - * Thrown to indicate that an actor referenced by an endpoint URI cannot be + * Thrown to indicate that an actor referenced by an endpoint URI cannot be * found in the ActorRegistry. * * @author Martin Krasser */ class ActorNotRegisteredException(uri: String) extends RuntimeException { - override def getMessage = "%s not registered" format uri - } \ No newline at end of file diff --git a/akka-camel/src/main/scala/service/CamelContextManager.scala b/akka-camel/src/main/scala/service/CamelContextManager.scala deleted file mode 100644 index a6f84c158c..0000000000 --- a/akka-camel/src/main/scala/service/CamelContextManager.scala +++ /dev/null @@ -1,23 +0,0 @@ -/** - * Copyright (C) 2009-2010 Scalable Solutions AB - */ - -package se.scalablesolutions.akka.camel.service - -import org.apache.camel.CamelContext -import org.apache.camel.impl.DefaultCamelContext - -/** - * Manages the CamelContext used by CamelService. - * - * @author Martin Krasser - */ -object CamelContextManager { - - /** - * The CamelContext used by CamelService. Can be modified by applications prior to - * loading the CamelService. - */ - var context: CamelContext = new DefaultCamelContext - -} \ No newline at end of file diff --git a/akka-camel/src/main/scala/service/CamelService.scala b/akka-camel/src/main/scala/service/CamelService.scala index 98a767797b..aa4a54ef12 100644 --- a/akka-camel/src/main/scala/service/CamelService.scala +++ b/akka-camel/src/main/scala/service/CamelService.scala @@ -10,11 +10,15 @@ import org.apache.camel.builder.RouteBuilder import se.scalablesolutions.akka.actor.{Actor, ActorRegistry} import se.scalablesolutions.akka.annotation.consume -import se.scalablesolutions.akka.camel.Consumer import se.scalablesolutions.akka.util.{Bootable, Logging} +import se.scalablesolutions.akka.camel.{CamelContextManager, Consumer} /** - * Started by the Kernel to expose actors as Camel endpoints. + * Started by the Kernel to expose certain actors as Camel endpoints. It uses + * se.scalablesolutions.akka.camel.CamelContextManage to create and manage the + * lifecycle of a global CamelContext. This class further uses the + * se.scalablesolutions.akka.camel.service.CamelServiceRouteBuilder to implement + * routes from Camel endpoints to actors. * * @see CamelRouteBuilder * @@ -22,42 +26,35 @@ import se.scalablesolutions.akka.util.{Bootable, Logging} */ trait CamelService extends Bootable with Logging { - import CamelContextManager.context + import CamelContextManager._ abstract override def onLoad = { super.onLoad - context.addRoutes(new CamelRouteBuilder) + if (!initialized) init() + context.addRoutes(new CamelServiceRouteBuilder) context.setStreamCaching(true) - context.start - log.info("Camel context started") + start() } abstract override def onUnload = { + stop() super.onUnload - context.stop - log.info("Camel context stopped") } } /** - * Generic route builder that searches the registry for actors that are - * either annotated with @se.scalablesolutions.akka.annotation.consume or - * mixed in se.scalablesolutions.akka.camel.Consumer and exposes them - * as Camel endpoints. + * Implements routes from Camel endpoints to actors. It searches the registry for actors + * that are either annotated with @se.scalablesolutions.akka.annotation.consume or mix in + * se.scalablesolutions.akka.camel.Consumer and exposes them as Camel endpoints. * * @author Martin Krasser */ -class CamelRouteBuilder extends RouteBuilder with Logging { +class CamelServiceRouteBuilder extends RouteBuilder with Logging { def configure = { val actors = ActorRegistry.actors - // - // TODO: resolve/clarify issues with ActorRegistry - // - multiple registration with same id/uuid possible - // - // TODO: avoid redundant registrations actors.filter(isConsumeAnnotated _).foreach { actor: Actor => val fromUri = actor.getClass.getAnnotation(classOf[consume]).value() diff --git a/akka-camel/src/test/scala/MessageTest.scala b/akka-camel/src/test/scala/MessageTest.scala index 791f243ee4..850119b5aa 100644 --- a/akka-camel/src/test/scala/MessageTest.scala +++ b/akka-camel/src/test/scala/MessageTest.scala @@ -1,24 +1,79 @@ -package se.scalablesolutions.akka.camel.service +package se.scalablesolutions.akka.camel import java.io.InputStream import org.apache.camel.NoTypeConversionAvailableException import org.junit.Assert._ -import org.junit.Test import org.scalatest.junit.JUnitSuite -import se.scalablesolutions.akka.camel.Message +import org.junit.Test class MessageTest extends JUnitSuite { + // + // TODO: extend/rewrite unit tests + // These tests currently only ensure proper functioning of basic features. + // + @Test def shouldConvertDoubleBodyToString = { - assertEquals("1.4", new Message(1.4, null).bodyAs(classOf[String])) + CamelContextManager.init() + assertEquals("1.4", Message(1.4, null).bodyAs(classOf[String])) } @Test def shouldThrowExceptionWhenConvertingDoubleBodyToInputStream { + CamelContextManager.init() intercept[NoTypeConversionAvailableException] { - new Message(1.4, null).bodyAs(classOf[InputStream]) + Message(1.4, null).bodyAs(classOf[InputStream]) } } + @Test def shouldReturnSubsetOfHeaders = { + val message = Message("test" , Map("A" -> "1", "B" -> "2")) + assertEquals(Map("B" -> "2"), message.headers(Set("B"))) + } + + @Test def shouldTransformBodyAndPreserveHeaders = { + assertEquals( + Message("ab", Map("A" -> "1")), + Message("a" , Map("A" -> "1")).transformBody[String](body => body + "b")) + } + + @Test def shouldConvertBodyAndPreserveHeaders = { + CamelContextManager.init() + assertEquals( + Message("1.4", Map("A" -> "1")), + Message(1.4 , Map("A" -> "1")).setBodyAs(classOf[String])) + } + + @Test def shouldSetBodyAndPreserveHeaders = { + assertEquals( + Message("test2" , Map("A" -> "1")), + Message("test1" , Map("A" -> "1")).setBody("test2")) + } + + @Test def shouldSetHeadersAndPreserveBody = { + assertEquals( + Message("test1" , Map("C" -> "3")), + Message("test1" , Map("A" -> "1")).setHeaders(Map("C" -> "3"))) + + } + + @Test def shouldAddHeaderAndPreserveBodyAndHeaders = { + assertEquals( + Message("test1" , Map("A" -> "1", "B" -> "2")), + Message("test1" , Map("A" -> "1")).addHeader("B" -> "2")) + } + + @Test def shouldAddHeadersAndPreserveBodyAndHeaders = { + assertEquals( + Message("test1" , Map("A" -> "1", "B" -> "2")), + Message("test1" , Map("A" -> "1")).addHeaders(Map("B" -> "2"))) + } + + @Test def shouldRemoveHeadersAndPreserveBodyAndRemainingHeaders = { + assertEquals( + Message("test1" , Map("A" -> "1")), + Message("test1" , Map("A" -> "1", "B" -> "2")).removeHeader("B")) + } + } \ No newline at end of file diff --git a/akka-camel/src/test/scala/ProducerTest.scala b/akka-camel/src/test/scala/ProducerTest.scala new file mode 100644 index 0000000000..1a69316836 --- /dev/null +++ b/akka-camel/src/test/scala/ProducerTest.scala @@ -0,0 +1,108 @@ +package se.scalablesolutions.akka.camel + +import org.apache.camel.{Exchange, Processor} +import org.apache.camel.builder.RouteBuilder +import org.apache.camel.component.mock.MockEndpoint +import org.junit.Assert._ +import org.junit.{Test, After, Before} +import org.scalatest.junit.JUnitSuite + +import se.scalablesolutions.akka.actor.Actor + +class ProducerTest extends JUnitSuite { + + // + // TODO: extend/rewrite unit tests + // These tests currently only ensure proper functioning of basic features. + // + + import CamelContextManager._ + + var mock: MockEndpoint = _ + + @Before def setUp = { + init() + context.addRoutes(new TestRouteBuilder) + start() + mock = context.getEndpoint("mock:mock", classOf[MockEndpoint]) + } + + @After def tearDown = { + stop() + } + + // + // TODO: test replies to messages sent with ! (bang) + // + + @Test def shouldProduceMessageSyncAndReceiveResponse = { + val producer = new TestProducer("direct:input2", false, false).start + val message = Message("test1", Map(Message.MessageExchangeId -> "123")) + val expected = Message("Hello test1", Map(Message.MessageExchangeId -> "123")) + assertEquals(expected, producer !! message get) + producer.stop + } + + @Test def shouldProduceMessageSyncAndReceiveFailure = { + val producer = new TestProducer("direct:input2", false, false).start + val message = Message("fail", Map(Message.MessageExchangeId -> "123")) + val result = producer.!![Failure](message).get + assertEquals("failure", result.cause.getMessage) + assertEquals(Map(Message.MessageExchangeId -> "123"), result.headers) + producer.stop + } + + @Test def shouldProduceMessageAsyncAndReceiveResponse = { + val producer = new TestProducer("direct:input2", true, false).start + val message = Message("test2", Map(Message.MessageExchangeId -> "124")) + val expected = Message("Hello test2", Map(Message.MessageExchangeId -> "124")) + assertEquals(expected, producer !! message get) + producer.stop + } + + @Test def shouldProduceMessageAsyncAndReceiveFailure = { + val producer = new TestProducer("direct:input2", true, false).start + val message = Message("fail", Map(Message.MessageExchangeId -> "124")) + val result = producer.!![Failure](message).get + assertEquals("failure", result.cause.getMessage) + assertEquals(Map(Message.MessageExchangeId -> "124"), result.headers) + producer.stop + } + + @Test def shouldProduceMessageSyncWithoutReceivingResponse = { + val producer = new TestProducer("direct:input1", false, true).start + mock.expectedBodiesReceived("test3") + producer.!("test3")(None) + producer.stop + } + + @Test def shouldProduceMessageAsyncAndReceiveResponseSync = { + val producer = new TestProducer("direct:input1", true, true).start + mock.expectedBodiesReceived("test4") + producer.!("test4")(None) + producer.stop + } + + class TestProducer(uri:String, prodAsync: Boolean, prodOneway: Boolean) extends Actor with Producer { + override def async = prodAsync + override def oneway = prodOneway + def endpointUri = uri + def receive = produce + } + + class TestRouteBuilder extends RouteBuilder { + def configure { + from("direct:input1").to("mock:mock") + from("direct:input2").process(new Processor() { + def process(exchange: Exchange) = { + val body = exchange.getIn.getBody + body match { + case "fail" => throw new Exception("failure") + case body => exchange.getOut.setBody("Hello %s" format body) + } + } + }) + } + } + +} \ No newline at end of file diff --git a/akka-camel/src/test/scala/component/ActorComponentTest.scala b/akka-camel/src/test/scala/component/ActorComponentTest.scala index 30ea7d1a5b..379349da7a 100644 --- a/akka-camel/src/test/scala/component/ActorComponentTest.scala +++ b/akka-camel/src/test/scala/component/ActorComponentTest.scala @@ -3,27 +3,24 @@ package se.scalablesolutions.akka.camel.component import org.junit._ import org.junit.Assert._ import org.scalatest.junit.JUnitSuite + import se.scalablesolutions.akka.actor.Actor -import se.scalablesolutions.akka.camel.Message -import org.apache.camel.{CamelContext, ExchangePattern} -import org.apache.camel.impl.{DefaultExchange, SimpleRegistry, DefaultCamelContext} +import se.scalablesolutions.akka.camel.{CamelContextLifecycle, Message} -/** - * @author Martin Krasser - */ -class ActorComponentTest extends JUnitSuite { +class ActorComponentTest extends JUnitSuite with CamelContextLifecycle { - val context = new DefaultCamelContext(new SimpleRegistry) - val template = context.createProducerTemplate + // + // TODO: extend/rewrite unit tests + // These tests currently only ensure proper functioning of basic features. + // @Before def setUp = { - context.start - template.start + init() + start() } @After def tearDown = { - template.stop - context.stop + stop() } @Test def shouldReceiveResponseFromActorReferencedById = { diff --git a/akka-camel/src/test/scala/component/ActorProducerTest.scala b/akka-camel/src/test/scala/component/ActorProducerTest.scala index 73e51ebb04..954a4d21cd 100644 --- a/akka-camel/src/test/scala/component/ActorProducerTest.scala +++ b/akka-camel/src/test/scala/component/ActorProducerTest.scala @@ -1,19 +1,21 @@ package se.scalablesolutions.akka.camel.component import org.apache.camel.{CamelContext, ExchangePattern} +import org.apache.camel.impl.{DefaultCamelContext, DefaultExchange} import org.junit.Assert._ import org.junit.Test import org.scalatest.junit.JUnitSuite import se.scalablesolutions.akka.actor.Actor import se.scalablesolutions.akka.camel.Message -import org.apache.camel.impl.{DefaultCamelContext, DefaultExchange} -/** - * @author Martin Krasser - */ class ActorProducerTest extends JUnitSuite { + // + // TODO: extend/rewrite unit tests + // These tests currently only ensure proper functioning of basic features. + // + val context = new DefaultCamelContext val endpoint = context.getEndpoint("actor:%s" format classOf[TestActor].getName) val producer = endpoint.createProducer diff --git a/akka-camel/src/test/scala/service/CamelServiceTest.scala b/akka-camel/src/test/scala/service/CamelServiceTest.scala index 52f6d1fd04..58ccc76273 100644 --- a/akka-camel/src/test/scala/service/CamelServiceTest.scala +++ b/akka-camel/src/test/scala/service/CamelServiceTest.scala @@ -1,68 +1,50 @@ package se.scalablesolutions.akka.camel.service import org.apache.camel.builder.RouteBuilder -import org.apache.camel.impl.DefaultCamelContext import org.junit.Assert._ import org.junit.{Before, After, Test} import org.scalatest.junit.JUnitSuite import se.scalablesolutions.akka.actor.Actor import se.scalablesolutions.akka.annotation.consume -import se.scalablesolutions.akka.camel.{Message, Consumer} +import se.scalablesolutions.akka.camel.{CamelContextManager, Consumer, Message} -/** - * @author Martin Krasser - */ -class CamelServiceTest extends JUnitSuite { +class CamelServiceTest extends JUnitSuite with CamelService { - import CamelContextManager.context + // + // TODO: extend/rewrite unit tests + // These tests currently only ensure proper functioning of basic features. + // - context = new DefaultCamelContext - context.addRoutes(new TestBuilder) + import CamelContextManager._ - val template = context.createProducerTemplate - var service: CamelService = _ var actor1: Actor = _ var actor2: Actor = _ var actor3: Actor = _ @Before def setUp = { - service = new CamelService { - override def onUnload = super.onUnload - override def onLoad = super.onLoad - } - actor1 = new TestActor1().start actor2 = new TestActor2().start actor3 = new TestActor3().start - - service.onLoad - template.start - + init() + context.addRoutes(new TestRouteBuilder) + onLoad } @After def tearDown = { + onUnload actor1.stop actor2.stop actor3.stop - - template.stop - service.onUnload } - @Test def shouldReceiveResponseFromActor1ViaGeneratedRoute = { - val result = template.requestBody("direct:actor1", "Martin") - assertEquals("Hello Martin (actor1)", result) + @Test def shouldReceiveResponseViaGeneratedRoute = { + assertEquals("Hello Martin (actor1)", template.requestBody("direct:actor1", "Martin")) + assertEquals("Hello Martin (actor2)", template.requestBody("direct:actor2", "Martin")) } - @Test def shouldReceiveResponseFromActor2ViaGeneratedRoute = { - val result = template.requestBody("direct:actor2", "Martin") - assertEquals("Hello Martin (actor2)", result) - } - - @Test def shouldReceiveResponseFromActor3ViaCustomRoute = { - val result = template.requestBody("direct:actor3", "Martin") - assertEquals("Hello Tester (actor3)", result) + @Test def shouldReceiveResponseViaCustomRoute = { + assertEquals("Hello Tester (actor3)", template.requestBody("direct:actor3", "Martin")) } } @@ -91,7 +73,7 @@ class TestActor3 extends Actor { } } -class TestBuilder extends RouteBuilder { +class TestRouteBuilder extends RouteBuilder { def configure { val actorUri = "actor:%s" format classOf[TestActor3].getName from("direct:actor3").transform(constant("Tester")).to("actor:actor3") diff --git a/akka-samples/akka-sample-camel/src/main/scala/Boot.scala b/akka-samples/akka-sample-camel/src/main/scala/Boot.scala index 0b3726c08b..b23c99dafa 100644 --- a/akka-samples/akka-sample-camel/src/main/scala/Boot.scala +++ b/akka-samples/akka-sample-camel/src/main/scala/Boot.scala @@ -1,10 +1,10 @@ package sample.camel import org.apache.camel.builder.RouteBuilder -import org.apache.camel.impl.DefaultCamelContext +import org.apache.camel.{Exchange, Processor} import se.scalablesolutions.akka.actor.SupervisorFactory -import se.scalablesolutions.akka.camel.service.CamelContextManager +import se.scalablesolutions.akka.camel.CamelContextManager import se.scalablesolutions.akka.config.ScalaConfig._ /** @@ -12,10 +12,8 @@ import se.scalablesolutions.akka.config.ScalaConfig._ */ class Boot { - import CamelContextManager.context - - context = new DefaultCamelContext - context.addRoutes(new CustomRouteBuilder) + CamelContextManager.init() + CamelContextManager.context.addRoutes(new CustomRouteBuilder) val factory = SupervisorFactory( SupervisorConfig( @@ -24,13 +22,27 @@ class Boot { Supervise(new Consumer2, LifeCycle(Permanent)) :: Nil)) factory.newInstance.start + val producer = new Producer1 + val mediator = new Transformer(producer) + val consumer = new Consumer3(mediator) + + producer.start + mediator.start + consumer.start + } class CustomRouteBuilder extends RouteBuilder { def configure { val actorUri = "actor:%s" format classOf[Consumer2].getName - from ("jetty:http://0.0.0.0:8877/camel/test2").to(actorUri) + from("jetty:http://0.0.0.0:8877/camel/test2").to(actorUri) + from("direct:welcome").process(new Processor() { + def process(exchange: Exchange) { + exchange.getOut.setBody("Welcome %s" format exchange.getIn.getBody) + } + }) + } } \ No newline at end of file diff --git a/akka-samples/akka-sample-camel/src/main/scala/Consumer3.scala b/akka-samples/akka-sample-camel/src/main/scala/Consumer3.scala new file mode 100644 index 0000000000..39cf1f0652 --- /dev/null +++ b/akka-samples/akka-sample-camel/src/main/scala/Consumer3.scala @@ -0,0 +1,17 @@ +package sample.camel + +import se.scalablesolutions.akka.actor.Actor +import se.scalablesolutions.akka.camel.{Message, Consumer} + +/** + * @author Martin Krasser + */ +class Consumer3(transformer: Actor) extends Actor with Consumer { + + def endpointUri = "jetty:http://0.0.0.0:8877/camel/welcome" + + def receive = { + case msg: Message => transformer.forward(msg.setBodyAs(classOf[String])) + } + +} diff --git a/akka-samples/akka-sample-camel/src/main/scala/Producer1.scala b/akka-samples/akka-sample-camel/src/main/scala/Producer1.scala new file mode 100644 index 0000000000..11151a58df --- /dev/null +++ b/akka-samples/akka-sample-camel/src/main/scala/Producer1.scala @@ -0,0 +1,17 @@ +package sample.camel + +import se.scalablesolutions.akka.actor.Actor +import se.scalablesolutions.akka.camel.Producer + +/** + * @author Martin Krasser + */ +class Producer1 extends Actor with Producer { + + def endpointUri = "direct:welcome" + + override def oneway = false // default + override def async = true // default + + protected def receive = produce +} \ No newline at end of file diff --git a/akka-samples/akka-sample-camel/src/main/scala/Transformer.scala b/akka-samples/akka-sample-camel/src/main/scala/Transformer.scala new file mode 100644 index 0000000000..0df05c594c --- /dev/null +++ b/akka-samples/akka-sample-camel/src/main/scala/Transformer.scala @@ -0,0 +1,15 @@ +package sample.camel + +import se.scalablesolutions.akka.actor.Actor +import se.scalablesolutions.akka.camel.Message + +/** + * @author Martin Krasser + */ +class Transformer(producer: Actor) extends Actor { + + protected def receive = { + case msg: Message => producer.forward(msg.transformBody[String]("- %s -" format _)) + } + +} \ No newline at end of file