Merge branch '457-krasserm'

This commit is contained in:
Martin Krasser 2010-10-09 18:59:35 +02:00
commit 698524dc2b
11 changed files with 292 additions and 63 deletions

View file

@ -159,7 +159,7 @@ object CamelServiceManager {
*/
def mandatoryService =
if (_current.isDefined) _current.get
else throw new IllegalStateException("co current Camel service")
else throw new IllegalStateException("co current CamelService")
/**
* Returns <code>Some(CamelService)</code> (containing the current CamelService)

View file

@ -1,30 +1,35 @@
/**
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.camel
import java.util.{Map => JMap, Set => JSet}
import scala.collection.JavaConversions._
import org.apache.camel.{Exchange, Message => CamelMessage}
import org.apache.camel.util.ExchangeHelper
import se.scalablesolutions.akka.japi.{Function => JFunction}
/**
* An immutable representation of a Camel message. Actor classes that mix in
* se.scalablesolutions.akka.camel.Producer or
* se.scalablesolutions.akka.camel.Consumer usually use this message type for communication.
* An immutable representation of a Camel message.
*
* @author Martin Krasser
*/
case class Message(val body: Any, val headers: Map[String, Any] = Map.empty) {
/**
* Returns the body of the message converted to the type given by the <code>clazz</code>
* argument. Conversion is done using Camel's type converter. The type converter is obtained
* from the CamelContext managed by CamelContextManager. Applications have to ensure proper
* initialization of CamelContextManager.
*
* @see CamelContextManager.
* Creates a Message with given body and empty headers map.
*/
def bodyAs[T](clazz: Class[T]): T =
CamelContextManager.mandatoryContext.getTypeConverter.mandatoryConvertTo[T](clazz, body)
def this(body: Any) = this(body, Map.empty[String, Any])
/**
* Creates a Message with given body and headers map. A copy of the headers map is made.
* <p>
* Java API
*/
def this(body: Any, headers: JMap[String, Any]) = this(body, headers.toMap)
/**
* Returns the body of the message converted to the type <code>T</code>. Conversion is done
@ -34,76 +39,143 @@ case class Message(val body: Any, val headers: Map[String, Any] = Map.empty) {
*
* @see CamelContextManager.
*/
def bodyAs[T](implicit m: Manifest[T]): T =
CamelContextManager.mandatoryContext.getTypeConverter.mandatoryConvertTo[T](m.erasure.asInstanceOf[Class[T]], body)
def bodyAs[T](implicit m: Manifest[T]): T = getBodyAs(m.erasure.asInstanceOf[Class[T]])
/**
* Returns the body of the message converted to the type as given by the <code>clazz</code>
* parameter. Conversion is done using Camel's type converter. The type converter is obtained
* from the CamelContext managed by CamelContextManager. Applications have to ensure proper
* initialization of CamelContextManager.
* <p>
* Java API
*
* @see CamelContextManager.
*/
def getBodyAs[T](clazz: Class[T]): T =
CamelContextManager.mandatoryContext.getTypeConverter.mandatoryConvertTo[T](clazz, body)
/**
* Returns those headers from this message whose name is contained in <code>names</code>.
*/
def headers(names: Set[String]): Map[String, Any] = headers.filter(names contains _._1)
/**
* Returns those headers from this message whose name is contained in <code>names</code>.
* The returned headers map is backed up by an immutable headers map. Any attempt to modify
* the returned map will throw an exception.
* <p>
* Java API
*/
def getHeaders(names: JSet[String]): JMap[String, Any] = headers.filter(names contains _._1)
/**
* Returns all headers from this message. The returned headers map is backed up by this
* message's immutable headers map. Any attempt to modify the returned map will throw an
* exception.
* <p>
* Java API
*/
def getHeaders: JMap[String, Any] = headers
/**
* Returns the header with given <code>name</code>. Throws <code>NoSuchElementException</code>
* if the header doesn't exist.
*/
def header(name: String): Any = headers(name)
/**
* Returns the header with given <code>name</code>. Throws <code>NoSuchElementException</code>
* if the header doesn't exist.
* <p>
* Java API
*/
def getHeader(name: String): Any = header(name)
/**
* Returns the header with given <code>name</code> converted to type <code>T</code>. Throws
* <code>NoSuchElementException</code> if the header doesn't exist.
*/
def headerAs[T](name: String)(implicit m: Manifest[T]): T =
CamelContextManager.mandatoryContext.getTypeConverter.mandatoryConvertTo[T](m.erasure.asInstanceOf[Class[T]], header(name))
getHeaderAs(name, m.erasure.asInstanceOf[Class[T]])
/**
* Returns the header with given <code>name</code> converted to type given by the <code>clazz</code>
* argument. Throws <code>NoSuchElementException</code> if the header doesn't exist.
* Returns the header with given <code>name</code> converted to type as given by the <code>clazz</code>
* parameter. Throws <code>NoSuchElementException</code> if the header doesn't exist.
* <p>
* Java API
*/
def headerAs[T](name: String, clazz: Class[T]): T =
def getHeaderAs[T](name: String, clazz: Class[T]): T =
CamelContextManager.mandatoryContext.getTypeConverter.mandatoryConvertTo[T](clazz, header(name))
/**
* Creates a Message with a new <code>body</code> using a <code>transformer</code> function.
* Creates a Message with a transformed body using a <code>transformer</code> function.
*/
def transformBody[A](transformer: A => Any): Message = setBody(transformer(body.asInstanceOf[A]))
/**
* Creates a Message with a new <code>body</code> converted to type <code>clazz</code>.
*
* @see Message#bodyAs(Class)
* Creates a Message with a transformed body using a <code>transformer</code> function.
* <p>
* Java API
*/
@deprecated("use setBodyAs[T](implicit m: Manifest[T]): Message instead")
def setBodyAs[T](clazz: Class[T]): Message = setBody(bodyAs(clazz))
def transformBody[A](transformer: JFunction[A, Any]): Message = setBody(transformer(body.asInstanceOf[A]))
/**
* Creates a Message with a new <code>body</code> converted to type <code>T</code>.
*
* @see Message#bodyAs(Class)
* Creates a Message with current <code>body</code> converted to type <code>T</code>.
*/
def setBodyAs[T](implicit m: Manifest[T]): Message = setBody(bodyAs[T])
def setBodyAs[T](implicit m: Manifest[T]): Message = setBodyAs(m.erasure.asInstanceOf[Class[T]])
/**
* Creates a Message with a new <code>body</code>.
* Creates a Message with current <code>body</code> converted to type <code>clazz</code>.
* <p>
* Java API
*/
def setBodyAs[T](clazz: Class[T]): Message = setBody(getBodyAs(clazz))
/**
* Creates a Message with a given <code>body</code>.
*/
def setBody(body: Any) = new Message(body, this.headers)
/**
* Creates a new Message with new <code>headers</code>.
* Creates a new Message with given <code>headers</code>.
*/
def setHeaders(headers: Map[String, Any]) = copy(this.body, headers)
def setHeaders(headers: Map[String, Any]): Message = copy(this.body, headers)
/**
* Creates a new Message with the <code>headers</code> argument added to the existing headers.
* Creates a new Message with given <code>headers</code>. A copy of the headers map is made.
* <p>
* Java API
*/
def addHeaders(headers: Map[String, Any]) = copy(this.body, this.headers ++ headers)
def setHeaders(headers: JMap[String, Any]): Message = setHeaders(headers.toMap)
/**
* Creates a new Message with the <code>header</code> argument added to the existing headers.
* Creates a new Message with given <code>headers</code> added to the current headers.
*/
def addHeader(header: (String, Any)) = copy(this.body, this.headers + header)
def addHeaders(headers: Map[String, Any]): Message = copy(this.body, this.headers ++ headers)
/**
* Creates a new Message where the header with name <code>headerName</code> is removed from
* Creates a new Message with given <code>headers</code> added to the current headers.
* A copy of the headers map is made.
* <p>
* Java API
*/
def addHeaders(headers: JMap[String, Any]): Message = addHeaders(headers.toMap)
/**
* Creates a new Message with the given <code>header</code> added to the current headers.
*/
def addHeader(header: (String, Any)): Message = copy(this.body, this.headers + header)
/**
* Creates a new Message with the given header, represented by <code>name</code> and
* <code>value</code> added to the existing headers.
* <p>
* Java API
*/
def addHeader(name: String, value: Any): Message = addHeader((name, value))
/**
* Creates a new Message where the header with given <code>headerName</code> is removed from
* the existing headers.
*/
def removeHeader(headerName: String) = copy(this.body, this.headers - headerName)
@ -127,7 +199,7 @@ object Message {
/**
* Creates a new Message with <code>body</code> as message body and an empty header map.
*/
def apply(body: Any) = new Message(body)
//def apply(body: Any) = new Message(body)
/**
* Creates a canonical form of the given message <code>msg</code>. If <code>msg</code> of type
@ -147,15 +219,43 @@ object Message {
*
* @author Martin Krasser
*/
case class Failure(val cause: Exception, val headers: Map[String, Any] = Map.empty)
case class Failure(val cause: Exception, val headers: Map[String, Any] = Map.empty) {
/**
* Creates a Failure with cause body and empty headers map.
*/
def this(cause: Exception) = this(cause, Map.empty[String, Any])
/**
* Creates a Failure with given cause and headers map. A copy of the headers map is made.
* <p>
* Java API
*/
def this(cause: Exception, headers: JMap[String, Any]) = this(cause, headers.toMap)
/**
* Returns the cause of this Failure.
* <p>
* Java API.
*/
def getCause = cause
/**
* Returns all headers from this failure message. The returned headers map is backed up by
* this message's immutable headers map. Any attempt to modify the returned map will throw
* an exception.
* <p>
* Java API
*/
def getHeaders: JMap[String, Any] = headers
}
/**
* Adapter for converting an org.apache.camel.Exchange to and from Message and Failure objects.
* Adapter for converting an org.apache.camel.Exchange to and from Message and Failure objects.
*
* @author Martin Krasser
* @author Martin Krasser
*/
class CamelExchangeAdapter(exchange: Exchange) {
import CamelMessageConversion.toMessageAdapter
/**
@ -256,10 +356,7 @@ class CamelMessageAdapter(val cm: CamelMessage) {
*/
def toMessage(headers: Map[String, Any]): Message = Message(cm.getBody, cmHeaders(headers, cm))
import scala.collection.JavaConversions._
private def cmHeaders(headers: Map[String, Any], cm: CamelMessage) =
headers ++ cm.getHeaders
private def cmHeaders(headers: Map[String, Any], cm: CamelMessage) = headers ++ cm.getHeaders
}
/**

View file

@ -0,0 +1,129 @@
package se.scalablesolutions.akka.camel;
import org.apache.camel.NoTypeConversionAvailableException;
import org.junit.BeforeClass;
import org.junit.Test;
import se.scalablesolutions.akka.camel.CamelContextManager;
import se.scalablesolutions.akka.camel.Message;
import se.scalablesolutions.akka.japi.Function;
import java.io.InputStream;
import java.util.*;
import static org.junit.Assert.*;
/**
* @author Martin Krasser
*/
public class MessageJavaTestBase {
@BeforeClass
public static void setUpBeforeClass() {
CamelContextManager.init();
}
@Test public void shouldConvertDoubleBodyToString() {
assertEquals("1.4", new Message("1.4").getBodyAs(String.class));
}
@Test(expected=NoTypeConversionAvailableException.class)
public void shouldThrowExceptionWhenConvertingDoubleBodyToInputStream() {
new Message(1.4).getBodyAs(InputStream.class);
}
@Test public void shouldReturnDoubleHeader() {
Message message = new Message("test" , createMap("test", 1.4));
assertEquals(1.4, message.getHeader("test"));
}
@Test public void shouldConvertDoubleHeaderToString() {
Message message = new Message("test" , createMap("test", 1.4));
assertEquals("1.4", message.getHeaderAs("test", String.class));
}
@Test public void shouldReturnSubsetOfHeaders() {
Message message = new Message("test" , createMap("A", "1", "B", "2"));
assertEquals(createMap("B", "2"), message.getHeaders(createSet("B")));
}
@Test(expected=UnsupportedOperationException.class)
public void shouldReturnSubsetOfHeadersUnmodifiable() {
Message message = new Message("test" , createMap("A", "1", "B", "2"));
message.getHeaders(createSet("B")).put("x", "y");
}
@Test public void shouldReturnAllHeaders() {
Message message = new Message("test" , createMap("A", "1", "B", "2"));
assertEquals(createMap("A", "1", "B", "2"), message.getHeaders());
}
@Test(expected=UnsupportedOperationException.class)
public void shouldReturnAllHeadersUnmodifiable() {
Message message = new Message("test" , createMap("A", "1", "B", "2"));
message.getHeaders().put("x", "y");
}
@Test public void shouldTransformBodyAndPreserveHeaders() {
assertEquals(
new Message("ab", createMap("A", "1")),
new Message("a" , createMap("A", "1")).transformBody((Function)new TestTransformer()));
}
@Test public void shouldConvertBodyAndPreserveHeaders() {
assertEquals(
new Message("1.4", createMap("A", "1")),
new Message(1.4 , createMap("A", "1")).setBodyAs(String.class));
}
@Test public void shouldSetBodyAndPreserveHeaders() {
assertEquals(
new Message("test2" , createMap("A", "1")),
new Message("test1" , createMap("A", "1")).setBody("test2"));
}
@Test public void shouldSetHeadersAndPreserveBody() {
assertEquals(
new Message("test1" , createMap("C", "3")),
new Message("test1" , createMap("A", "1")).setHeaders(createMap("C", "3")));
}
@Test public void shouldAddHeaderAndPreserveBodyAndHeaders() {
assertEquals(
new Message("test1" , createMap("A", "1", "B", "2")),
new Message("test1" , createMap("A", "1")).addHeader("B", "2"));
}
@Test public void shouldAddHeadersAndPreserveBodyAndHeaders() {
assertEquals(
new Message("test1" , createMap("A", "1", "B", "2")),
new Message("test1" , createMap("A", "1")).addHeaders(createMap("B", "2")));
}
@Test public void shouldRemoveHeadersAndPreserveBodyAndRemainingHeaders() {
assertEquals(
new Message("test1" , createMap("A", "1")),
new Message("test1" , createMap("A", "1", "B", "2")).removeHeader("B"));
}
private static Set<String> createSet(String... entries) {
HashSet<String> set = new HashSet<String>();
set.addAll(Arrays.asList(entries));
return set;
}
private static Map<String, Object> createMap(Object... pairs) {
HashMap<String, Object> map = new HashMap<String, Object>();
for (int i = 0; i < pairs.length; i += 2) {
map.put((String)pairs[i], pairs[i+1]);
}
return map;
}
private static class TestTransformer implements Function<String, String> {
public String apply(String param) {
return param + "b";
}
}
}

View file

@ -21,8 +21,8 @@ public class SampleRemoteUntypedConsumer extends RemoteUntypedConsumerActor {
public void onReceive(Object message) {
Message msg = (Message)message;
String body = msg.bodyAs(String.class);
String header = msg.headerAs("test", String.class);
String body = msg.getBodyAs(String.class);
String header = msg.getHeaderAs("test", String.class);
getContext().replySafe(String.format("%s %s", body, header));
}

View file

@ -13,8 +13,8 @@ public class SampleUntypedConsumer extends UntypedConsumerActor {
public void onReceive(Object message) {
Message msg = (Message)message;
String body = msg.bodyAs(String.class);
String header = msg.headerAs("test", String.class);
String body = msg.getBodyAs(String.class);
String header = msg.getHeaderAs("test", String.class);
getContext().replySafe(String.format("%s %s", body, header));
}

View file

@ -15,8 +15,8 @@ public class SampleUntypedConsumerBlocking extends UntypedConsumerActor {
public void onReceive(Object message) {
Message msg = (Message)message;
String body = msg.bodyAs(String.class);
String header = msg.headerAs("test", String.class);
String body = msg.getBodyAs(String.class);
String header = msg.getHeaderAs("test", String.class);
getContext().replySafe(String.format("%s %s", body, header));
}

View file

@ -12,7 +12,7 @@ public class SampleUntypedForwardingProducer extends UntypedProducerActor {
@Override
public void onReceiveAfterProduce(Object message) {
Message msg = (Message)message;
String body = msg.bodyAs(String.class);
String body = msg.getBodyAs(String.class);
CamelContextManager.getMandatoryTemplate().sendBody("direct:forward-test-1", body);
}
}

View file

@ -0,0 +1,5 @@
package se.scalablesolutions.akka.camel
import org.scalatest.junit.JUnitSuite
class MessageJavaTest extends MessageJavaTestBase with JUnitSuite

View file

@ -10,17 +10,16 @@ import org.scalatest.BeforeAndAfterAll
import org.scalatest.junit.JUnitSuite
class MessageTest extends JUnitSuite with BeforeAndAfterAll {
class MessageScalaTest extends JUnitSuite with BeforeAndAfterAll {
override protected def beforeAll = CamelContextManager.init
@Test def shouldConvertDoubleBodyToString = {
assertEquals("1.4", Message(1.4, null).bodyAs[String])
assertEquals("1.4", Message(1.4, null).bodyAs(classOf[String]))
assertEquals("1.4", Message(1.4).bodyAs[String])
}
@Test def shouldThrowExceptionWhenConvertingDoubleBodyToInputStream {
intercept[NoTypeConversionAvailableException] {
Message(1.4, null).bodyAs[InputStream]
Message(1.4).bodyAs[InputStream]
}
}
@ -32,7 +31,6 @@ class MessageTest extends JUnitSuite with BeforeAndAfterAll {
@Test def shouldConvertDoubleHeaderToString = {
val message = Message("test" , Map("test" -> 1.4))
assertEquals("1.4", message.headerAs[String]("test"))
assertEquals("1.4", message.headerAs("test", classOf[String]))
}
@Test def shouldReturnSubsetOfHeaders = {
@ -43,7 +41,7 @@ class MessageTest extends JUnitSuite with BeforeAndAfterAll {
@Test def shouldTransformBodyAndPreserveHeaders = {
assertEquals(
Message("ab", Map("A" -> "1")),
Message("a" , Map("A" -> "1")).transformBody[String](body => body + "b"))
Message("a" , Map("A" -> "1")).transformBody((body: String) => body + "b"))
}
@Test def shouldConvertBodyAndPreserveHeaders = {

View file

@ -246,7 +246,7 @@ object ProducerFeatureTest {
class TestProducer(uri: String, upper: Boolean = false) extends Actor with Producer {
def endpointUri = uri
override protected def receiveBeforeProduce = {
case msg: Message => if (upper) msg.transformBody[String] { _.toUpperCase } else msg
case msg: Message => if (upper) msg.transformBody { body: String => body.toUpperCase } else msg
}
}
@ -261,7 +261,7 @@ object ProducerFeatureTest {
protected def receive = {
case msg: Message => msg.body match {
case "fail" => self.reply(Failure(new Exception("failure"), msg.headers))
case _ => self.reply(msg.transformBody[String] { "received %s" format _ })
case _ => self.reply(msg.transformBody { body: String => "received %s" format body })
}
}
}

View file

@ -84,7 +84,7 @@ class Consumer5 extends Actor with Consumer with Logging {
class Transformer(producer: ActorRef) extends Actor {
protected def receive = {
case msg: Message => producer.forward(msg.transformBody[String]("- %s -" format _))
case msg: Message => producer.forward(msg.transformBody( (body: String) => "- %s -" format body))
}
}
@ -137,7 +137,7 @@ class HttpProducer(transformer: ActorRef) extends Actor with Producer {
class HttpTransformer extends Actor {
protected def receive = {
case msg: Message => self.reply(msg.transformBody[String] {_ replaceAll ("Akka ", "AKKA ")})
case msg: Message => self.reply(msg.transformBody {body: String => body replaceAll ("Akka ", "AKKA ")})
case msg: Failure => self.reply(msg)
}
}