2011-05-23 11:37:56 -04:00
|
|
|
/**
|
2012-01-19 14:38:44 +00:00
|
|
|
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
|
2011-05-23 11:37:56 -04:00
|
|
|
*/
|
2012-01-19 14:38:44 +00:00
|
|
|
|
2011-05-23 11:37:56 -04:00
|
|
|
package akka.camel
|
|
|
|
|
|
|
|
|
|
import java.util.{ Map ⇒ JMap, Set ⇒ JSet }
|
|
|
|
|
|
|
|
|
|
import scala.collection.JavaConversions._
|
|
|
|
|
|
|
|
|
|
import akka.japi.{ Function ⇒ JFunction }
|
2012-03-01 17:32:10 +01:00
|
|
|
import org.apache.camel.{ CamelContext, Message ⇒ JCamelMessage }
|
2012-05-07 14:18:06 +02:00
|
|
|
import akka.AkkaException
|
2011-05-23 11:37:56 -04:00
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* An immutable representation of a Camel message.
|
|
|
|
|
*
|
|
|
|
|
* @author Martin Krasser
|
|
|
|
|
*/
|
2012-03-01 17:32:10 +01:00
|
|
|
case class CamelMessage(body: Any, headers: Map[String, Any]) {
|
2011-05-23 11:37:56 -04:00
|
|
|
|
2012-01-19 14:38:44 +00:00
|
|
|
def this(body: Any, headers: JMap[String, Any]) = this(body, headers.toMap) //for Java
|
2011-05-23 11:37:56 -04:00
|
|
|
|
2012-05-23 15:17:49 +02:00
|
|
|
override def toString: String = "CamelMessage(%s, %s)" format (body, headers)
|
2011-05-23 11:37:56 -04:00
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Returns those headers from this message whose name is contained in <code>names</code>.
|
|
|
|
|
*/
|
2012-05-23 15:17:49 +02:00
|
|
|
def headers(names: Set[String]): Map[String, Any] = headers filterKeys names
|
2011-05-23 11:37:56 -04:00
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Returns those headers from this message whose name is contained in <code>names</code>.
|
|
|
|
|
* The returned headers map is backed up by an immutable headers map. Any attempt to modify
|
|
|
|
|
* the returned map will throw an exception.
|
|
|
|
|
* <p>
|
|
|
|
|
* Java API
|
|
|
|
|
*/
|
2012-01-19 14:38:44 +00:00
|
|
|
def getHeaders(names: JSet[String]): JMap[String, Any] = headers(names.toSet)
|
2011-05-23 11:37:56 -04:00
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Returns all headers from this message. The returned headers map is backed up by this
|
|
|
|
|
* message's immutable headers map. Any attempt to modify the returned map will throw an
|
|
|
|
|
* exception.
|
|
|
|
|
* <p>
|
|
|
|
|
* Java API
|
|
|
|
|
*/
|
|
|
|
|
def getHeaders: JMap[String, Any] = headers
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Returns the header with given <code>name</code>. Throws <code>NoSuchElementException</code>
|
|
|
|
|
* if the header doesn't exist.
|
|
|
|
|
*/
|
2012-01-19 14:38:44 +00:00
|
|
|
def header(name: String): Option[Any] = headers.get(name)
|
2011-05-23 11:37:56 -04:00
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Returns the header with given <code>name</code>. Throws <code>NoSuchElementException</code>
|
|
|
|
|
* if the header doesn't exist.
|
|
|
|
|
* <p>
|
|
|
|
|
* Java API
|
|
|
|
|
*/
|
2012-01-19 14:38:44 +00:00
|
|
|
def getHeader(name: String): Any = headers(name)
|
2011-05-23 11:37:56 -04:00
|
|
|
|
|
|
|
|
/**
|
2012-03-01 17:32:10 +01:00
|
|
|
* Creates a CamelMessage with a transformed body using a <code>transformer</code> function.
|
2011-05-23 11:37:56 -04:00
|
|
|
*/
|
2012-03-01 17:32:10 +01:00
|
|
|
def mapBody[A, B](transformer: A ⇒ B): CamelMessage = withBody(transformer(body.asInstanceOf[A]))
|
2011-05-23 11:37:56 -04:00
|
|
|
|
|
|
|
|
/**
|
2012-03-01 17:32:10 +01:00
|
|
|
* Creates a CamelMessage with a transformed body using a <code>transformer</code> function.
|
2011-05-23 11:37:56 -04:00
|
|
|
* <p>
|
|
|
|
|
* Java API
|
|
|
|
|
*/
|
2012-03-01 17:32:10 +01:00
|
|
|
def mapBody[A, B](transformer: JFunction[A, B]): CamelMessage = withBody(transformer(body.asInstanceOf[A]))
|
2011-05-23 11:37:56 -04:00
|
|
|
|
|
|
|
|
/**
|
2012-03-01 17:32:10 +01:00
|
|
|
* Creates a CamelMessage with a given <code>body</code>.
|
2011-05-23 11:37:56 -04:00
|
|
|
*/
|
2012-05-23 15:17:49 +02:00
|
|
|
def withBody(body: Any): CamelMessage = CamelMessage(body, this.headers)
|
2011-05-23 11:37:56 -04:00
|
|
|
|
|
|
|
|
/**
|
2012-03-01 17:32:10 +01:00
|
|
|
* Creates a new CamelMessage with given <code>headers</code>.
|
2012-01-19 14:38:44 +00:00
|
|
|
*/
|
2012-03-01 17:32:10 +01:00
|
|
|
def withHeaders[A](headers: Map[String, A]): CamelMessage = copy(this.body, headers)
|
2012-01-19 14:38:44 +00:00
|
|
|
|
|
|
|
|
/**
|
2012-03-01 17:32:10 +01:00
|
|
|
* Creates a new CamelMessage with given <code>headers</code>. A copy of the headers map is made.
|
2011-05-23 11:37:56 -04:00
|
|
|
* <p>
|
|
|
|
|
* Java API
|
|
|
|
|
*/
|
2012-03-01 17:32:10 +01:00
|
|
|
def withHeaders[A](headers: JMap[String, A]): CamelMessage = withHeaders(headers.toMap)
|
2011-05-23 11:37:56 -04:00
|
|
|
|
|
|
|
|
/**
|
2012-03-01 17:32:10 +01:00
|
|
|
* Creates a new CamelMessage with given <code>headers</code> added to the current headers.
|
2011-05-23 11:37:56 -04:00
|
|
|
*/
|
2012-03-01 17:32:10 +01:00
|
|
|
def addHeaders[A](headers: Map[String, A]): CamelMessage = copy(this.body, this.headers ++ headers)
|
2011-05-23 11:37:56 -04:00
|
|
|
|
|
|
|
|
/**
|
2012-03-01 17:32:10 +01:00
|
|
|
* Creates a new CamelMessage with given <code>headers</code> added to the current headers.
|
2012-01-19 14:38:44 +00:00
|
|
|
* A copy of the headers map is made.
|
2011-05-23 11:37:56 -04:00
|
|
|
* <p>
|
|
|
|
|
* Java API
|
|
|
|
|
*/
|
2012-03-01 17:32:10 +01:00
|
|
|
def addHeaders[A](headers: JMap[String, A]): CamelMessage = addHeaders(headers.toMap)
|
2011-05-23 11:37:56 -04:00
|
|
|
|
|
|
|
|
/**
|
2012-03-01 17:32:10 +01:00
|
|
|
* Creates a new CamelMessage with the given <code>header</code> added to the current headers.
|
2011-05-23 11:37:56 -04:00
|
|
|
*/
|
2012-03-01 17:32:10 +01:00
|
|
|
def addHeader(header: (String, Any)): CamelMessage = copy(this.body, this.headers + header)
|
2011-05-23 11:37:56 -04:00
|
|
|
|
|
|
|
|
/**
|
2012-03-01 17:32:10 +01:00
|
|
|
* Creates a new CamelMessage with the given header, represented by <code>name</code> and
|
2012-01-19 14:38:44 +00:00
|
|
|
* <code>value</code> added to the existing headers.
|
|
|
|
|
* <p>
|
|
|
|
|
* Java API
|
2011-05-23 11:37:56 -04:00
|
|
|
*/
|
2012-03-01 17:32:10 +01:00
|
|
|
def addHeader(name: String, value: Any): CamelMessage = addHeader((name, value))
|
2011-05-23 11:37:56 -04:00
|
|
|
|
|
|
|
|
/**
|
2012-03-01 17:32:10 +01:00
|
|
|
* Creates a new CamelMessage where the header with given <code>headerName</code> is removed from
|
2012-01-19 14:38:44 +00:00
|
|
|
* the existing headers.
|
2011-05-23 11:37:56 -04:00
|
|
|
*/
|
2012-05-23 15:17:49 +02:00
|
|
|
def withoutHeader(headerName: String): CamelMessage = copy(this.body, this.headers - headerName)
|
2012-01-19 14:38:44 +00:00
|
|
|
|
2012-05-23 15:17:49 +02:00
|
|
|
def copyContentTo(to: JCamelMessage): Unit = {
|
2012-01-19 14:38:44 +00:00
|
|
|
to.setBody(this.body)
|
|
|
|
|
for ((name, value) ← this.headers) to.getHeaders.put(name, value.asInstanceOf[AnyRef])
|
|
|
|
|
}
|
2011-05-23 11:37:56 -04:00
|
|
|
|
|
|
|
|
/**
|
2012-01-19 14:38:44 +00:00
|
|
|
* Returns the body of the message converted to the type <code>T</code>. Conversion is done
|
2012-03-18 10:46:08 +01:00
|
|
|
* using Camel's type converter. The type converter is obtained from the CamelContext that is passed in.
|
|
|
|
|
* The CamelContext is accessible in a [[akka.camel.javaapi.UntypedConsumerActor]] and [[akka.camel.javaapi.UntypedProducerActor]]
|
|
|
|
|
* using the `getCamelContext` method, and is available on the [[akka.camel.CamelExtension]].
|
2011-05-23 11:37:56 -04:00
|
|
|
*/
|
2012-03-01 17:32:10 +01:00
|
|
|
def bodyAs[T](implicit m: Manifest[T], camelContext: CamelContext): T = getBodyAs(m.erasure.asInstanceOf[Class[T]], camelContext)
|
2011-05-23 11:37:56 -04:00
|
|
|
|
|
|
|
|
/**
|
2012-01-19 14:38:44 +00:00
|
|
|
* Returns the body of the message converted to the type as given by the <code>clazz</code>
|
|
|
|
|
* parameter. Conversion is done using Camel's type converter. The type converter is obtained
|
2012-03-18 10:46:08 +01:00
|
|
|
* from the CamelContext that is passed in.
|
|
|
|
|
* <p>
|
|
|
|
|
* The CamelContext is accessible in a [[akka.camel.javaapi.UntypedConsumerActor]] and [[akka.camel.javaapi.UntypedProducerActor]]
|
|
|
|
|
* using the `getCamelContext` method, and is available on the [[akka.camel.CamelExtension]].
|
2011-05-23 11:37:56 -04:00
|
|
|
* <p>
|
|
|
|
|
* Java API
|
2012-01-19 14:38:44 +00:00
|
|
|
*
|
2011-05-23 11:37:56 -04:00
|
|
|
*/
|
2012-05-23 15:17:49 +02:00
|
|
|
def getBodyAs[T](clazz: Class[T], camelContext: CamelContext): T = camelContext.getTypeConverter.mandatoryConvertTo[T](clazz, body)
|
2011-05-23 11:37:56 -04:00
|
|
|
|
|
|
|
|
/**
|
2012-03-01 17:32:10 +01:00
|
|
|
* Creates a CamelMessage with current <code>body</code> converted to type <code>T</code>.
|
2012-03-18 10:46:08 +01:00
|
|
|
* The CamelContext is accessible in a [[akka.camel.javaapi.UntypedConsumerActor]] and [[akka.camel.javaapi.UntypedProducerActor]]
|
|
|
|
|
* using the `getCamelContext` method, and is available on the [[akka.camel.CamelExtension]].
|
2011-05-23 11:37:56 -04:00
|
|
|
*/
|
2012-03-01 17:32:10 +01:00
|
|
|
def withBodyAs[T](implicit m: Manifest[T], camelContext: CamelContext): CamelMessage = withBodyAs(m.erasure.asInstanceOf[Class[T]])
|
2011-05-23 11:37:56 -04:00
|
|
|
|
|
|
|
|
/**
|
2012-03-01 17:32:10 +01:00
|
|
|
* Creates a CamelMessage with current <code>body</code> converted to type <code>clazz</code>.
|
2011-05-23 11:37:56 -04:00
|
|
|
* <p>
|
2012-03-18 10:46:08 +01:00
|
|
|
* The CamelContext is accessible in a [[akka.camel.javaapi.UntypedConsumerActor]] and [[akka.camel.javaapi.UntypedProducerActor]]
|
|
|
|
|
* using the `getCamelContext` method, and is available on the [[akka.camel.CamelExtension]].
|
|
|
|
|
* <p>
|
2011-05-23 11:37:56 -04:00
|
|
|
* Java API
|
|
|
|
|
*/
|
2012-03-01 17:32:10 +01:00
|
|
|
def withBodyAs[T](clazz: Class[T])(implicit camelContext: CamelContext): CamelMessage = withBody(getBodyAs(clazz, camelContext))
|
2011-05-23 11:37:56 -04:00
|
|
|
|
|
|
|
|
/**
|
2012-01-19 14:38:44 +00:00
|
|
|
* Returns the header with given <code>name</code> converted to type <code>T</code>. Throws
|
|
|
|
|
* <code>NoSuchElementException</code> if the header doesn't exist.
|
2012-03-18 10:46:08 +01:00
|
|
|
* <p>
|
|
|
|
|
* The CamelContext is accessible in a [[akka.camel.javaapi.UntypedConsumerActor]] and [[akka.camel.javaapi.UntypedProducerActor]]
|
|
|
|
|
* using the `getCamelContext` method, and is available on the [[akka.camel.CamelExtension]].
|
|
|
|
|
*
|
2011-05-23 11:37:56 -04:00
|
|
|
*/
|
2012-03-01 17:32:10 +01:00
|
|
|
def headerAs[T](name: String)(implicit m: Manifest[T], camelContext: CamelContext): Option[T] = header(name).map(camelContext.getTypeConverter.mandatoryConvertTo[T](m.erasure.asInstanceOf[Class[T]], _))
|
2012-01-19 14:38:44 +00:00
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Returns the header with given <code>name</code> converted to type as given by the <code>clazz</code>
|
|
|
|
|
* parameter. Throws <code>NoSuchElementException</code> if the header doesn't exist.
|
|
|
|
|
* <p>
|
2012-03-18 10:46:08 +01:00
|
|
|
* The CamelContext is accessible in a [[akka.camel.javaapi.UntypedConsumerActor]] and [[akka.camel.javaapi.UntypedProducerActor]]
|
|
|
|
|
* using the `getCamelContext` method, and is available on the [[akka.camel.CamelExtension]].
|
|
|
|
|
* <p>
|
2012-01-19 14:38:44 +00:00
|
|
|
* Java API
|
|
|
|
|
*/
|
2012-05-23 15:17:49 +02:00
|
|
|
def getHeaderAs[T](name: String, clazz: Class[T], camelContext: CamelContext): T = headerAs[T](name)(Manifest.classType(clazz), camelContext).get
|
2012-01-19 14:38:44 +00:00
|
|
|
|
2011-05-23 11:37:56 -04:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
2012-03-01 17:32:10 +01:00
|
|
|
* Companion object of CamelMessage class.
|
2011-05-23 11:37:56 -04:00
|
|
|
*
|
|
|
|
|
* @author Martin Krasser
|
|
|
|
|
*/
|
2012-03-01 17:32:10 +01:00
|
|
|
object CamelMessage {
|
2011-05-23 11:37:56 -04:00
|
|
|
|
|
|
|
|
/**
|
2012-03-01 17:32:10 +01:00
|
|
|
* CamelMessage header to correlate request with response messages. Applications that send
|
2011-05-23 11:37:56 -04:00
|
|
|
* messages to a Producer actor may want to set this header on the request message
|
|
|
|
|
* so that it can be correlated with an asynchronous response. Messages send to Consumer
|
|
|
|
|
* actors have this header already set.
|
|
|
|
|
*/
|
2012-05-25 00:49:45 +02:00
|
|
|
val MessageExchangeId = "MessageExchangeId" //Deliberately without type ascription to make it a constant
|
2011-05-23 11:37:56 -04:00
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Creates a canonical form of the given message <code>msg</code>. If <code>msg</code> of type
|
2012-03-01 17:32:10 +01:00
|
|
|
* CamelMessage then <code>msg</code> is returned, otherwise <code>msg</code> is set as body of a
|
|
|
|
|
* newly created CamelMessage object.
|
2011-05-23 11:37:56 -04:00
|
|
|
*/
|
|
|
|
|
def canonicalize(msg: Any) = msg match {
|
2012-03-01 17:32:10 +01:00
|
|
|
case mobj: CamelMessage ⇒ mobj
|
|
|
|
|
case body ⇒ CamelMessage(body, Map.empty)
|
2011-05-23 11:37:56 -04:00
|
|
|
}
|
2012-01-19 14:38:44 +00:00
|
|
|
|
|
|
|
|
/**
|
2012-03-01 17:32:10 +01:00
|
|
|
* Creates a new CamelMessage object from the Camel message.
|
2012-01-19 14:38:44 +00:00
|
|
|
*/
|
2012-03-01 17:32:10 +01:00
|
|
|
def from(camelMessage: JCamelMessage): CamelMessage = from(camelMessage, Map.empty)
|
2012-01-19 14:38:44 +00:00
|
|
|
|
|
|
|
|
/**
|
2012-03-01 17:32:10 +01:00
|
|
|
* Creates a new CamelMessage object from the Camel message.
|
2012-01-19 14:38:44 +00:00
|
|
|
*
|
2012-03-01 17:32:10 +01:00
|
|
|
* @param headers additional headers to set on the created CamelMessage in addition to those
|
2012-01-19 14:38:44 +00:00
|
|
|
* in the Camel message.
|
|
|
|
|
*/
|
2012-03-01 17:32:10 +01:00
|
|
|
def from(camelMessage: JCamelMessage, headers: Map[String, Any]): CamelMessage = CamelMessage(camelMessage.getBody, headers ++ camelMessage.getHeaders)
|
2012-01-19 14:38:44 +00:00
|
|
|
|
2011-05-23 11:37:56 -04:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
2012-01-19 14:38:44 +00:00
|
|
|
* Positive acknowledgement message (used for application-acknowledged message receipts).
|
2012-07-18 08:06:07 +02:00
|
|
|
* When `autoAck` is set to false in the [[akka.camel.Consumer]], you can send an `Ack` to the sender of the CamelMessage.
|
2011-05-23 11:37:56 -04:00
|
|
|
* @author Martin Krasser
|
|
|
|
|
*/
|
|
|
|
|
case object Ack {
|
|
|
|
|
/** Java API to get the Ack singleton */
|
2012-05-11 21:37:40 +02:00
|
|
|
def getInstance = this
|
2011-05-23 11:37:56 -04:00
|
|
|
}
|
|
|
|
|
|
2012-05-07 14:18:06 +02:00
|
|
|
/**
|
|
|
|
|
* An exception indicating that the exchange to the camel endpoint failed.
|
|
|
|
|
* It contains the failure cause obtained from Exchange.getException and the headers from either the Exchange.getIn
|
|
|
|
|
* message or Exchange.getOut message, depending on the exchange pattern.
|
|
|
|
|
*
|
|
|
|
|
*/
|
2012-05-23 15:17:49 +02:00
|
|
|
class AkkaCamelException private[akka] (cause: Throwable, val headers: Map[String, Any])
|
|
|
|
|
extends AkkaException(cause.getMessage, cause) {
|
|
|
|
|
def this(cause: Throwable) = this(cause, Map.empty)
|
|
|
|
|
}
|