Merge pull request #894 from akka/wip-2724-streamcache-patriknw

Reset StreamCache in getBodyAs to be able to re-read it, see #2724
This commit is contained in:
Patrik Nordwall 2012-11-26 07:12:36 -08:00
commit e5fce9ee2a
3 changed files with 32 additions and 4 deletions

View file

@ -5,7 +5,7 @@
package akka.camel package akka.camel
import java.util.{ Map JMap, Set JSet } import java.util.{ Map JMap, Set JSet }
import org.apache.camel.{ CamelContext, Message JCamelMessage } import org.apache.camel.{ CamelContext, Message JCamelMessage, StreamCache }
import akka.AkkaException import akka.AkkaException
import scala.reflect.ClassTag import scala.reflect.ClassTag
import scala.util.Try import scala.util.Try
@ -107,7 +107,21 @@ case class CamelMessage(body: Any, headers: Map[String, Any]) {
* Java API * Java API
* *
*/ */
def getBodyAs[T](clazz: Class[T], camelContext: CamelContext): T = camelContext.getTypeConverter.mandatoryConvertTo[T](clazz, body) def getBodyAs[T](clazz: Class[T], camelContext: CamelContext): T = {
val result = camelContext.getTypeConverter.mandatoryConvertTo[T](clazz, body)
// to be able to re-read a StreamCache we must "undo" the side effect by resetting the StreamCache
resetStreamCache()
result
}
/**
* Reset StreamCache body. Nothing is done if the body is not a StreamCache.
* See http://camel.apache.org/stream-caching.html
*/
def resetStreamCache(): Unit = body match {
case stream: StreamCache stream.reset
case _
}
/** /**
* Returns a new CamelMessage with a new body, while keeping the same headers. * Returns a new CamelMessage with a new body, while keeping the same headers.

View file

@ -8,6 +8,7 @@ import akka.actor.ActorSystem;
import akka.dispatch.Mapper; import akka.dispatch.Mapper;
import akka.japi.Function; import akka.japi.Function;
import org.apache.camel.NoTypeConversionAvailableException; import org.apache.camel.NoTypeConversionAvailableException;
import org.apache.camel.converter.stream.InputStreamCache;
import org.junit.AfterClass; import org.junit.AfterClass;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import org.junit.Test; import org.junit.Test;
@ -100,6 +101,14 @@ public class MessageJavaTestBase {
message("test1" , createMap("A", "1")).withHeaders(createMap("C", "3"))); message("test1" , createMap("A", "1")).withHeaders(createMap("C", "3")));
} }
@Test
public void shouldBeAbleToReReadStreamCacheBody() throws Exception {
CamelMessage msg = new CamelMessage(new InputStreamCache("test1".getBytes("utf-8")), empty);
assertEquals("test1", msg.getBodyAs(String.class, camel.context()));
// re-read
assertEquals("test1", msg.getBodyAs(String.class, camel.context()));
}
private static Set<String> createSet(String... entries) { private static Set<String> createSet(String... entries) {
HashSet<String> set = new HashSet<String>(); HashSet<String> set = new HashSet<String>();
set.addAll(Arrays.asList(entries)); set.addAll(Arrays.asList(entries));

View file

@ -5,11 +5,11 @@
package akka.camel package akka.camel
import java.io.InputStream import java.io.InputStream
import org.apache.camel.NoTypeConversionAvailableException import org.apache.camel.NoTypeConversionAvailableException
import akka.camel.TestSupport.{ SharedCamelSystem } import akka.camel.TestSupport.{ SharedCamelSystem }
import org.scalatest.FunSuite import org.scalatest.FunSuite
import org.scalatest.matchers.MustMatchers import org.scalatest.matchers.MustMatchers
import org.apache.camel.converter.stream.InputStreamCache
class MessageScalaTest extends FunSuite with MustMatchers with SharedCamelSystem { class MessageScalaTest extends FunSuite with MustMatchers with SharedCamelSystem {
implicit def camelContext = camel.context implicit def camelContext = camel.context
@ -44,12 +44,17 @@ class MessageScalaTest extends FunSuite with MustMatchers with SharedCamelSystem
test("mustSetBodyAndPreserveHeaders") { test("mustSetBodyAndPreserveHeaders") {
CamelMessage("test1", Map("A" -> "1")).copy(body = "test2") must be( CamelMessage("test1", Map("A" -> "1")).copy(body = "test2") must be(
CamelMessage("test2", Map("A" -> "1"))) CamelMessage("test2", Map("A" -> "1")))
} }
test("mustSetHeadersAndPreserveBody") { test("mustSetHeadersAndPreserveBody") {
CamelMessage("test1", Map("A" -> "1")).copy(headers = Map("C" -> "3")) must be( CamelMessage("test1", Map("A" -> "1")).copy(headers = Map("C" -> "3")) must be(
CamelMessage("test1", Map("C" -> "3"))) CamelMessage("test1", Map("C" -> "3")))
}
test("mustBeAbleToReReadStreamCacheBody") {
val msg = CamelMessage(new InputStreamCache("test1".getBytes("utf-8")), Map.empty)
msg.bodyAs[String] must be("test1")
// re-read
msg.bodyAs[String] must be("test1")
} }
} }