diff --git a/akka-camel/src/main/scala/akka/camel/CamelMessage.scala b/akka-camel/src/main/scala/akka/camel/CamelMessage.scala index c569ae0997..c9dc32e597 100644 --- a/akka-camel/src/main/scala/akka/camel/CamelMessage.scala +++ b/akka-camel/src/main/scala/akka/camel/CamelMessage.scala @@ -5,7 +5,7 @@ package akka.camel import java.util.{ Map ⇒ JMap, Set ⇒ JSet } -import org.apache.camel.{ CamelContext, Message ⇒ JCamelMessage } +import org.apache.camel.{ CamelContext, Message ⇒ JCamelMessage, StreamCache } import akka.AkkaException import scala.reflect.ClassTag import scala.util.Try @@ -107,7 +107,21 @@ case class CamelMessage(body: Any, headers: Map[String, Any]) { * Java API * */ - def getBodyAs[T](clazz: Class[T], camelContext: CamelContext): T = camelContext.getTypeConverter.mandatoryConvertTo[T](clazz, body) + def getBodyAs[T](clazz: Class[T], camelContext: CamelContext): T = { + val result = camelContext.getTypeConverter.mandatoryConvertTo[T](clazz, body) + // to be able to re-read a StreamCache we must "undo" the side effect by resetting the StreamCache + resetStreamCache() + result + } + + /** + * Reset StreamCache body. Nothing is done if the body is not a StreamCache. + * See http://camel.apache.org/stream-caching.html + */ + def resetStreamCache(): Unit = body match { + case stream: StreamCache ⇒ stream.reset + case _ ⇒ + } /** * Returns a new CamelMessage with a new body, while keeping the same headers. diff --git a/akka-camel/src/test/java/akka/camel/MessageJavaTestBase.java b/akka-camel/src/test/java/akka/camel/MessageJavaTestBase.java index 6b366b0a10..d805a8b2c1 100644 --- a/akka-camel/src/test/java/akka/camel/MessageJavaTestBase.java +++ b/akka-camel/src/test/java/akka/camel/MessageJavaTestBase.java @@ -8,6 +8,7 @@ import akka.actor.ActorSystem; import akka.dispatch.Mapper; import akka.japi.Function; import org.apache.camel.NoTypeConversionAvailableException; +import org.apache.camel.converter.stream.InputStreamCache; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; @@ -100,6 +101,14 @@ public class MessageJavaTestBase { message("test1" , createMap("A", "1")).withHeaders(createMap("C", "3"))); } + @Test + public void shouldBeAbleToReReadStreamCacheBody() throws Exception { + CamelMessage msg = new CamelMessage(new InputStreamCache("test1".getBytes("utf-8")), empty); + assertEquals("test1", msg.getBodyAs(String.class, camel.context())); + // re-read + assertEquals("test1", msg.getBodyAs(String.class, camel.context())); + } + private static Set createSet(String... entries) { HashSet set = new HashSet(); set.addAll(Arrays.asList(entries)); diff --git a/akka-camel/src/test/scala/akka/camel/MessageScalaTest.scala b/akka-camel/src/test/scala/akka/camel/MessageScalaTest.scala index dd73027624..cbf0190e91 100644 --- a/akka-camel/src/test/scala/akka/camel/MessageScalaTest.scala +++ b/akka-camel/src/test/scala/akka/camel/MessageScalaTest.scala @@ -5,11 +5,11 @@ package akka.camel import java.io.InputStream - import org.apache.camel.NoTypeConversionAvailableException import akka.camel.TestSupport.{ SharedCamelSystem } import org.scalatest.FunSuite import org.scalatest.matchers.MustMatchers +import org.apache.camel.converter.stream.InputStreamCache class MessageScalaTest extends FunSuite with MustMatchers with SharedCamelSystem { implicit def camelContext = camel.context @@ -44,12 +44,17 @@ class MessageScalaTest extends FunSuite with MustMatchers with SharedCamelSystem test("mustSetBodyAndPreserveHeaders") { CamelMessage("test1", Map("A" -> "1")).copy(body = "test2") must be( CamelMessage("test2", Map("A" -> "1"))) - } test("mustSetHeadersAndPreserveBody") { CamelMessage("test1", Map("A" -> "1")).copy(headers = Map("C" -> "3")) must be( CamelMessage("test1", Map("C" -> "3"))) + } + test("mustBeAbleToReReadStreamCacheBody") { + val msg = CamelMessage(new InputStreamCache("test1".getBytes("utf-8")), Map.empty) + msg.bodyAs[String] must be("test1") + // re-read + msg.bodyAs[String] must be("test1") } }