From 617478e9ecee84ad59146cea86897a6068909e7d Mon Sep 17 00:00:00 2001 From: Martin Krasser Date: Sat, 9 Oct 2010 10:16:47 +0200 Subject: [PATCH 1/2] Java API for Message and Failure classes --- akka-camel/src/main/scala/CamelService.scala | 2 +- akka-camel/src/main/scala/Message.scala | 187 +++++++++++++----- .../camel/SampleRemoteUntypedConsumer.java | 4 +- .../akka/camel/SampleUntypedConsumer.java | 4 +- .../camel/SampleUntypedConsumerBlocking.java | 4 +- .../SampleUntypedForwardingProducer.java | 2 +- ...ssageTest.scala => MessageScalaTest.scala} | 10 +- .../src/test/scala/ProducerFeatureTest.scala | 4 +- .../src/main/scala/Actors.scala | 4 +- 9 files changed, 158 insertions(+), 63 deletions(-) rename akka-camel/src/test/scala/{MessageTest.scala => MessageScalaTest.scala} (85%) diff --git a/akka-camel/src/main/scala/CamelService.scala b/akka-camel/src/main/scala/CamelService.scala index d53ff07dec..3795b8a7fb 100644 --- a/akka-camel/src/main/scala/CamelService.scala +++ b/akka-camel/src/main/scala/CamelService.scala @@ -159,7 +159,7 @@ object CamelServiceManager { */ def mandatoryService = if (_current.isDefined) _current.get - else throw new IllegalStateException("co current Camel service") + else throw new IllegalStateException("co current CamelService") /** * Returns Some(CamelService) (containing the current CamelService) diff --git a/akka-camel/src/main/scala/Message.scala b/akka-camel/src/main/scala/Message.scala index 7c503009e8..55fc4e58f9 100644 --- a/akka-camel/src/main/scala/Message.scala +++ b/akka-camel/src/main/scala/Message.scala @@ -1,30 +1,35 @@ /** * Copyright (C) 2009-2010 Scalable Solutions AB */ - package se.scalablesolutions.akka.camel +import java.util.{Map => JMap, Set => JSet} + +import scala.collection.JavaConversions._ + import org.apache.camel.{Exchange, Message => CamelMessage} import org.apache.camel.util.ExchangeHelper +import se.scalablesolutions.akka.japi.{Function => JFunction} + /** - * An immutable representation of a Camel message. Actor classes that mix in - * se.scalablesolutions.akka.camel.Producer or - * se.scalablesolutions.akka.camel.Consumer usually use this message type for communication. + * An immutable representation of a Camel message. * * @author Martin Krasser */ case class Message(val body: Any, val headers: Map[String, Any] = Map.empty) { + /** - * Returns the body of the message converted to the type given by the clazz - * argument. Conversion is done using Camel's type converter. The type converter is obtained - * from the CamelContext managed by CamelContextManager. Applications have to ensure proper - * initialization of CamelContextManager. - * - * @see CamelContextManager. + * Creates a Message with given body and empty headers map. */ - def bodyAs[T](clazz: Class[T]): T = - CamelContextManager.mandatoryContext.getTypeConverter.mandatoryConvertTo[T](clazz, body) + def this(body: Any) = this(body, Map.empty[String, Any]) + + /** + * Creates a Message with given body and headers map. A copy of the headers map is made. + *

+ * Java API + */ + def this(body: Any, headers: JMap[String, Any]) = this(body, headers.toMap) /** * Returns the body of the message converted to the type T. Conversion is done @@ -34,76 +39,143 @@ case class Message(val body: Any, val headers: Map[String, Any] = Map.empty) { * * @see CamelContextManager. */ - def bodyAs[T](implicit m: Manifest[T]): T = - CamelContextManager.mandatoryContext.getTypeConverter.mandatoryConvertTo[T](m.erasure.asInstanceOf[Class[T]], body) + def bodyAs[T](implicit m: Manifest[T]): T = getBodyAs(m.erasure.asInstanceOf[Class[T]]) + + /** + * Returns the body of the message converted to the type as given by the clazz + * parameter. Conversion is done using Camel's type converter. The type converter is obtained + * from the CamelContext managed by CamelContextManager. Applications have to ensure proper + * initialization of CamelContextManager. + *

+ * Java API + * + * @see CamelContextManager. + */ + def getBodyAs[T](clazz: Class[T]): T = + CamelContextManager.mandatoryContext.getTypeConverter.mandatoryConvertTo[T](clazz, body) /** * Returns those headers from this message whose name is contained in names. */ def headers(names: Set[String]): Map[String, Any] = headers.filter(names contains _._1) + /** + * Returns those headers from this message whose name is contained in names. + * The returned headers map is backed up by an immutable headers map. Any attempt to modify + * the returned map will throw an exception. + *

+ * Java API + */ + def getHeaders(names: JSet[String]): JMap[String, Any] = headers.filter(names contains _._1) + + /** + * Returns all headers from this message. The returned headers map is backed up by this + * message's immutable headers map. Any attempt to modify the returned map will throw an + * exception. + *

+ * Java API + */ + def getHeaders: JMap[String, Any] = headers + /** * Returns the header with given name. Throws NoSuchElementException * if the header doesn't exist. */ def header(name: String): Any = headers(name) + /** + * Returns the header with given name. Throws NoSuchElementException + * if the header doesn't exist. + *

+ * Java API + */ + def getHeader(name: String): Any = header(name) + /** * Returns the header with given name converted to type T. Throws * NoSuchElementException if the header doesn't exist. */ def headerAs[T](name: String)(implicit m: Manifest[T]): T = - CamelContextManager.mandatoryContext.getTypeConverter.mandatoryConvertTo[T](m.erasure.asInstanceOf[Class[T]], header(name)) + getHeaderAs(name, m.erasure.asInstanceOf[Class[T]]) /** - * Returns the header with given name converted to type given by the clazz - * argument. Throws NoSuchElementException if the header doesn't exist. + * Returns the header with given name converted to type as given by the clazz + * parameter. Throws NoSuchElementException if the header doesn't exist. + *

+ * Java API */ - def headerAs[T](name: String, clazz: Class[T]): T = + def getHeaderAs[T](name: String, clazz: Class[T]): T = CamelContextManager.mandatoryContext.getTypeConverter.mandatoryConvertTo[T](clazz, header(name)) /** - * Creates a Message with a new body using a transformer function. + * Creates a Message with a transformed body using a transformer function. */ def transformBody[A](transformer: A => Any): Message = setBody(transformer(body.asInstanceOf[A])) /** - * Creates a Message with a new body converted to type clazz. - * - * @see Message#bodyAs(Class) + * Creates a Message with a transformed body using a transformer function. + *

+ * Java API */ - @deprecated("use setBodyAs[T](implicit m: Manifest[T]): Message instead") - def setBodyAs[T](clazz: Class[T]): Message = setBody(bodyAs(clazz)) + def transformBody[A](transformer: JFunction[A, Any]): Message = setBody(transformer(body.asInstanceOf[A])) /** - * Creates a Message with a new body converted to type T. - * - * @see Message#bodyAs(Class) + * Creates a Message with current body converted to type T. */ - def setBodyAs[T](implicit m: Manifest[T]): Message = setBody(bodyAs[T]) + def setBodyAs[T](implicit m: Manifest[T]): Message = setBodyAs(m.erasure.asInstanceOf[Class[T]]) /** - * Creates a Message with a new body. + * Creates a Message with current body converted to type clazz. + *

+ * Java API + */ + def setBodyAs[T](clazz: Class[T]): Message = setBody(getBodyAs(clazz)) + + /** + * Creates a Message with a given body. */ def setBody(body: Any) = new Message(body, this.headers) /** - * Creates a new Message with new headers. + * Creates a new Message with given headers. */ - def setHeaders(headers: Map[String, Any]) = copy(this.body, headers) + def setHeaders(headers: Map[String, Any]): Message = copy(this.body, headers) /** - * Creates a new Message with the headers argument added to the existing headers. + * Creates a new Message with given headers. A copy of the headers map is made. + *

+ * Java API */ - def addHeaders(headers: Map[String, Any]) = copy(this.body, this.headers ++ headers) + def setHeaders(headers: JMap[String, Any]): Message = setHeaders(headers.toMap) /** - * Creates a new Message with the header argument added to the existing headers. + * Creates a new Message with given headers added to the current headers. */ - def addHeader(header: (String, Any)) = copy(this.body, this.headers + header) + def addHeaders(headers: Map[String, Any]): Message = copy(this.body, this.headers ++ headers) /** - * Creates a new Message where the header with name headerName is removed from + * Creates a new Message with given headers added to the current headers. + * A copy of the headers map is made. + *

+ * Java API + */ + def addHeaders(headers: JMap[String, Any]): Message = addHeaders(headers.toMap) + + /** + * Creates a new Message with the given header added to the current headers. + */ + def addHeader(header: (String, Any)): Message = copy(this.body, this.headers + header) + + /** + * Creates a new Message with the given header, represented by name and + * value added to the existing headers. + *

+ * Java API + */ + def addHeader(name: String, value: Any): Message = addHeader((name, value)) + + /** + * Creates a new Message where the header with given headerName is removed from * the existing headers. */ def removeHeader(headerName: String) = copy(this.body, this.headers - headerName) @@ -127,7 +199,7 @@ object Message { /** * Creates a new Message with body as message body and an empty header map. */ - def apply(body: Any) = new Message(body) + //def apply(body: Any) = new Message(body) /** * Creates a canonical form of the given message msg. If msg of type @@ -147,15 +219,43 @@ object Message { * * @author Martin Krasser */ -case class Failure(val cause: Exception, val headers: Map[String, Any] = Map.empty) +case class Failure(val cause: Exception, val headers: Map[String, Any] = Map.empty) { + + /** + * Creates a Failure with cause body and empty headers map. + */ + def this(cause: Exception) = this(cause, Map.empty[String, Any]) + + /** + * Creates a Failure with given cause and headers map. A copy of the headers map is made. + *

+ * Java API + */ + def this(cause: Exception, headers: JMap[String, Any]) = this(cause, headers.toMap) + + /** + * Returns the cause of this Failure. + *

+ * Java API. + */ + def getCause = cause + + /** + * Returns all headers from this failure message. The returned headers map is backed up by + * this message's immutable headers map. Any attempt to modify the returned map will throw + * an exception. + *

+ * Java API + */ + def getHeaders: JMap[String, Any] = headers +} /** - * Adapter for converting an org.apache.camel.Exchange to and from Message and Failure objects. + * Adapter for converting an org.apache.camel.Exchange to and from Message and Failure objects. * - * @author Martin Krasser + * @author Martin Krasser */ class CamelExchangeAdapter(exchange: Exchange) { - import CamelMessageConversion.toMessageAdapter /** @@ -256,10 +356,7 @@ class CamelMessageAdapter(val cm: CamelMessage) { */ def toMessage(headers: Map[String, Any]): Message = Message(cm.getBody, cmHeaders(headers, cm)) - import scala.collection.JavaConversions._ - - private def cmHeaders(headers: Map[String, Any], cm: CamelMessage) = - headers ++ cm.getHeaders + private def cmHeaders(headers: Map[String, Any], cm: CamelMessage) = headers ++ cm.getHeaders } /** diff --git a/akka-camel/src/test/java/se/scalablesolutions/akka/camel/SampleRemoteUntypedConsumer.java b/akka-camel/src/test/java/se/scalablesolutions/akka/camel/SampleRemoteUntypedConsumer.java index c35bd92b71..f9cc4ac71c 100644 --- a/akka-camel/src/test/java/se/scalablesolutions/akka/camel/SampleRemoteUntypedConsumer.java +++ b/akka-camel/src/test/java/se/scalablesolutions/akka/camel/SampleRemoteUntypedConsumer.java @@ -21,8 +21,8 @@ public class SampleRemoteUntypedConsumer extends RemoteUntypedConsumerActor { public void onReceive(Object message) { Message msg = (Message)message; - String body = msg.bodyAs(String.class); - String header = msg.headerAs("test", String.class); + String body = msg.getBodyAs(String.class); + String header = msg.getHeaderAs("test", String.class); getContext().replySafe(String.format("%s %s", body, header)); } diff --git a/akka-camel/src/test/java/se/scalablesolutions/akka/camel/SampleUntypedConsumer.java b/akka-camel/src/test/java/se/scalablesolutions/akka/camel/SampleUntypedConsumer.java index 01b0edd251..60dbe453ef 100644 --- a/akka-camel/src/test/java/se/scalablesolutions/akka/camel/SampleUntypedConsumer.java +++ b/akka-camel/src/test/java/se/scalablesolutions/akka/camel/SampleUntypedConsumer.java @@ -13,8 +13,8 @@ public class SampleUntypedConsumer extends UntypedConsumerActor { public void onReceive(Object message) { Message msg = (Message)message; - String body = msg.bodyAs(String.class); - String header = msg.headerAs("test", String.class); + String body = msg.getBodyAs(String.class); + String header = msg.getHeaderAs("test", String.class); getContext().replySafe(String.format("%s %s", body, header)); } diff --git a/akka-camel/src/test/java/se/scalablesolutions/akka/camel/SampleUntypedConsumerBlocking.java b/akka-camel/src/test/java/se/scalablesolutions/akka/camel/SampleUntypedConsumerBlocking.java index a8ce6f1484..2bc19a6d08 100644 --- a/akka-camel/src/test/java/se/scalablesolutions/akka/camel/SampleUntypedConsumerBlocking.java +++ b/akka-camel/src/test/java/se/scalablesolutions/akka/camel/SampleUntypedConsumerBlocking.java @@ -15,8 +15,8 @@ public class SampleUntypedConsumerBlocking extends UntypedConsumerActor { public void onReceive(Object message) { Message msg = (Message)message; - String body = msg.bodyAs(String.class); - String header = msg.headerAs("test", String.class); + String body = msg.getBodyAs(String.class); + String header = msg.getHeaderAs("test", String.class); getContext().replySafe(String.format("%s %s", body, header)); } diff --git a/akka-camel/src/test/java/se/scalablesolutions/akka/camel/SampleUntypedForwardingProducer.java b/akka-camel/src/test/java/se/scalablesolutions/akka/camel/SampleUntypedForwardingProducer.java index bfa34f42e5..84dfa8fb86 100644 --- a/akka-camel/src/test/java/se/scalablesolutions/akka/camel/SampleUntypedForwardingProducer.java +++ b/akka-camel/src/test/java/se/scalablesolutions/akka/camel/SampleUntypedForwardingProducer.java @@ -12,7 +12,7 @@ public class SampleUntypedForwardingProducer extends UntypedProducerActor { @Override public void onReceiveAfterProduce(Object message) { Message msg = (Message)message; - String body = msg.bodyAs(String.class); + String body = msg.getBodyAs(String.class); CamelContextManager.getMandatoryTemplate().sendBody("direct:forward-test-1", body); } } diff --git a/akka-camel/src/test/scala/MessageTest.scala b/akka-camel/src/test/scala/MessageScalaTest.scala similarity index 85% rename from akka-camel/src/test/scala/MessageTest.scala rename to akka-camel/src/test/scala/MessageScalaTest.scala index 1467402b9a..aedd9171cd 100644 --- a/akka-camel/src/test/scala/MessageTest.scala +++ b/akka-camel/src/test/scala/MessageScalaTest.scala @@ -10,17 +10,16 @@ import org.scalatest.BeforeAndAfterAll import org.scalatest.junit.JUnitSuite -class MessageTest extends JUnitSuite with BeforeAndAfterAll { +class MessageScalaTest extends JUnitSuite with BeforeAndAfterAll { override protected def beforeAll = CamelContextManager.init @Test def shouldConvertDoubleBodyToString = { - assertEquals("1.4", Message(1.4, null).bodyAs[String]) - assertEquals("1.4", Message(1.4, null).bodyAs(classOf[String])) + assertEquals("1.4", Message(1.4).bodyAs[String]) } @Test def shouldThrowExceptionWhenConvertingDoubleBodyToInputStream { intercept[NoTypeConversionAvailableException] { - Message(1.4, null).bodyAs[InputStream] + Message(1.4).bodyAs[InputStream] } } @@ -32,7 +31,6 @@ class MessageTest extends JUnitSuite with BeforeAndAfterAll { @Test def shouldConvertDoubleHeaderToString = { val message = Message("test" , Map("test" -> 1.4)) assertEquals("1.4", message.headerAs[String]("test")) - assertEquals("1.4", message.headerAs("test", classOf[String])) } @Test def shouldReturnSubsetOfHeaders = { @@ -43,7 +41,7 @@ class MessageTest extends JUnitSuite with BeforeAndAfterAll { @Test def shouldTransformBodyAndPreserveHeaders = { assertEquals( Message("ab", Map("A" -> "1")), - Message("a" , Map("A" -> "1")).transformBody[String](body => body + "b")) + Message("a" , Map("A" -> "1")).transformBody((body: String) => body + "b")) } @Test def shouldConvertBodyAndPreserveHeaders = { diff --git a/akka-camel/src/test/scala/ProducerFeatureTest.scala b/akka-camel/src/test/scala/ProducerFeatureTest.scala index 5f31bcbe1c..fefd6afbfe 100644 --- a/akka-camel/src/test/scala/ProducerFeatureTest.scala +++ b/akka-camel/src/test/scala/ProducerFeatureTest.scala @@ -246,7 +246,7 @@ object ProducerFeatureTest { class TestProducer(uri: String, upper: Boolean = false) extends Actor with Producer { def endpointUri = uri override protected def receiveBeforeProduce = { - case msg: Message => if (upper) msg.transformBody[String] { _.toUpperCase } else msg + case msg: Message => if (upper) msg.transformBody { body: String => body.toUpperCase } else msg } } @@ -261,7 +261,7 @@ object ProducerFeatureTest { protected def receive = { case msg: Message => msg.body match { case "fail" => self.reply(Failure(new Exception("failure"), msg.headers)) - case _ => self.reply(msg.transformBody[String] { "received %s" format _ }) + case _ => self.reply(msg.transformBody { body: String => "received %s" format body }) } } } diff --git a/akka-samples/akka-sample-camel/src/main/scala/Actors.scala b/akka-samples/akka-sample-camel/src/main/scala/Actors.scala index 64bdb19dfd..db2aab1729 100644 --- a/akka-samples/akka-sample-camel/src/main/scala/Actors.scala +++ b/akka-samples/akka-sample-camel/src/main/scala/Actors.scala @@ -84,7 +84,7 @@ class Consumer5 extends Actor with Consumer with Logging { class Transformer(producer: ActorRef) extends Actor { protected def receive = { - case msg: Message => producer.forward(msg.transformBody[String]("- %s -" format _)) + case msg: Message => producer.forward(msg.transformBody( (body: String) => "- %s -" format body)) } } @@ -137,7 +137,7 @@ class HttpProducer(transformer: ActorRef) extends Actor with Producer { class HttpTransformer extends Actor { protected def receive = { - case msg: Message => self.reply(msg.transformBody[String] {_ replaceAll ("Akka ", "AKKA ")}) + case msg: Message => self.reply(msg.transformBody {body: String => body replaceAll ("Akka ", "AKKA ")}) case msg: Failure => self.reply(msg) } } From 2a602a5f9de272afc8647531ed0682f0cedad95a Mon Sep 17 00:00:00 2001 From: Martin Krasser Date: Sat, 9 Oct 2010 10:17:42 +0200 Subject: [PATCH 2/2] Tests for Message Java API --- .../akka/camel/MessageJavaTestBase.java | 129 ++++++++++++++++++ .../src/test/scala/MessageJavaTest.scala | 5 + 2 files changed, 134 insertions(+) create mode 100644 akka-camel/src/test/java/se/scalablesolutions/akka/camel/MessageJavaTestBase.java create mode 100644 akka-camel/src/test/scala/MessageJavaTest.scala diff --git a/akka-camel/src/test/java/se/scalablesolutions/akka/camel/MessageJavaTestBase.java b/akka-camel/src/test/java/se/scalablesolutions/akka/camel/MessageJavaTestBase.java new file mode 100644 index 0000000000..0db2f2b214 --- /dev/null +++ b/akka-camel/src/test/java/se/scalablesolutions/akka/camel/MessageJavaTestBase.java @@ -0,0 +1,129 @@ +package se.scalablesolutions.akka.camel; + +import org.apache.camel.NoTypeConversionAvailableException; +import org.junit.BeforeClass; +import org.junit.Test; + +import se.scalablesolutions.akka.camel.CamelContextManager; +import se.scalablesolutions.akka.camel.Message; +import se.scalablesolutions.akka.japi.Function; + +import java.io.InputStream; +import java.util.*; + +import static org.junit.Assert.*; + +/** + * @author Martin Krasser + */ +public class MessageJavaTestBase { + + @BeforeClass + public static void setUpBeforeClass() { + CamelContextManager.init(); + } + + @Test public void shouldConvertDoubleBodyToString() { + assertEquals("1.4", new Message("1.4").getBodyAs(String.class)); + } + + @Test(expected=NoTypeConversionAvailableException.class) + public void shouldThrowExceptionWhenConvertingDoubleBodyToInputStream() { + new Message(1.4).getBodyAs(InputStream.class); + } + + @Test public void shouldReturnDoubleHeader() { + Message message = new Message("test" , createMap("test", 1.4)); + assertEquals(1.4, message.getHeader("test")); + } + + @Test public void shouldConvertDoubleHeaderToString() { + Message message = new Message("test" , createMap("test", 1.4)); + assertEquals("1.4", message.getHeaderAs("test", String.class)); + } + + @Test public void shouldReturnSubsetOfHeaders() { + Message message = new Message("test" , createMap("A", "1", "B", "2")); + assertEquals(createMap("B", "2"), message.getHeaders(createSet("B"))); + } + + @Test(expected=UnsupportedOperationException.class) + public void shouldReturnSubsetOfHeadersUnmodifiable() { + Message message = new Message("test" , createMap("A", "1", "B", "2")); + message.getHeaders(createSet("B")).put("x", "y"); + } + + @Test public void shouldReturnAllHeaders() { + Message message = new Message("test" , createMap("A", "1", "B", "2")); + assertEquals(createMap("A", "1", "B", "2"), message.getHeaders()); + } + + @Test(expected=UnsupportedOperationException.class) + public void shouldReturnAllHeadersUnmodifiable() { + Message message = new Message("test" , createMap("A", "1", "B", "2")); + message.getHeaders().put("x", "y"); + } + + @Test public void shouldTransformBodyAndPreserveHeaders() { + assertEquals( + new Message("ab", createMap("A", "1")), + new Message("a" , createMap("A", "1")).transformBody((Function)new TestTransformer())); + } + + @Test public void shouldConvertBodyAndPreserveHeaders() { + assertEquals( + new Message("1.4", createMap("A", "1")), + new Message(1.4 , createMap("A", "1")).setBodyAs(String.class)); + } + + @Test public void shouldSetBodyAndPreserveHeaders() { + assertEquals( + new Message("test2" , createMap("A", "1")), + new Message("test1" , createMap("A", "1")).setBody("test2")); + } + + @Test public void shouldSetHeadersAndPreserveBody() { + assertEquals( + new Message("test1" , createMap("C", "3")), + new Message("test1" , createMap("A", "1")).setHeaders(createMap("C", "3"))); + } + + @Test public void shouldAddHeaderAndPreserveBodyAndHeaders() { + assertEquals( + new Message("test1" , createMap("A", "1", "B", "2")), + new Message("test1" , createMap("A", "1")).addHeader("B", "2")); + } + + @Test public void shouldAddHeadersAndPreserveBodyAndHeaders() { + assertEquals( + new Message("test1" , createMap("A", "1", "B", "2")), + new Message("test1" , createMap("A", "1")).addHeaders(createMap("B", "2"))); + } + + @Test public void shouldRemoveHeadersAndPreserveBodyAndRemainingHeaders() { + assertEquals( + new Message("test1" , createMap("A", "1")), + new Message("test1" , createMap("A", "1", "B", "2")).removeHeader("B")); + } + + private static Set createSet(String... entries) { + HashSet set = new HashSet(); + set.addAll(Arrays.asList(entries)); + return set; + } + + private static Map createMap(Object... pairs) { + HashMap map = new HashMap(); + for (int i = 0; i < pairs.length; i += 2) { + map.put((String)pairs[i], pairs[i+1]); + } + return map; + } + + private static class TestTransformer implements Function { + public String apply(String param) { + return param + "b"; + } + } + +} diff --git a/akka-camel/src/test/scala/MessageJavaTest.scala b/akka-camel/src/test/scala/MessageJavaTest.scala new file mode 100644 index 0000000000..80accd7432 --- /dev/null +++ b/akka-camel/src/test/scala/MessageJavaTest.scala @@ -0,0 +1,5 @@ +package se.scalablesolutions.akka.camel + +import org.scalatest.junit.JUnitSuite + +class MessageJavaTest extends MessageJavaTestBase with JUnitSuite